Skip to main content

data_preprocess/
parquet_store.rs

1//! Parquet-based storage backend for tick and bar data.
2//!
3//! Uses Hive-style directory partitioning:
4//!   {root}/ticks/exchange={ex}/symbol={sym}/{date}.parquet
5//!   {root}/bars/exchange={ex}/symbol={sym}/timeframe={tf}/{date}.parquet
6
7use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10
11use chrono::NaiveDateTime;
12use polars::prelude::*;
13
14use crate::convert::{
15    bars_to_dataframe, dataframe_to_bars, dataframe_to_ticks, ndt_to_date_string,
16    ticks_to_dataframe,
17};
18use crate::error::{DataError, Result};
19use crate::models::{Bar, BarQueryOpts, QueryOpts, StatRow, Tick};
20
21/// Parquet-based storage backend for tick and bar data.
22pub struct ParquetStore {
23    root: PathBuf,
24}
25
26impl ParquetStore {
27    /// Open a Parquet data store rooted at the given directory, creating it if needed.
28    pub fn open(root: impl AsRef<Path>) -> Result<Self> {
29        let root = root.as_ref().to_path_buf();
30        fs::create_dir_all(&root)?;
31        Ok(Self { root })
32    }
33
34    // ── Import ──────────────────────────────────────────────────
35
36    /// Import ticks, deduplicating against existing data per date partition.
37    /// Returns the number of rows actually inserted (after dedup).
38    pub fn insert_ticks(&self, ticks: &[Tick]) -> Result<usize> {
39        if ticks.is_empty() {
40            return Ok(0);
41        }
42
43        // Group ticks by (exchange, symbol, date)
44        let mut groups: HashMap<(String, String, String), Vec<&Tick>> = HashMap::new();
45        for tick in ticks {
46            let date = ndt_to_date_string(&tick.ts);
47            let key = (tick.exchange.clone(), tick.symbol.clone(), date);
48            groups.entry(key).or_default().push(tick);
49        }
50
51        let mut total_inserted = 0usize;
52
53        for ((exchange, symbol, date), group_ticks) in &groups {
54            let dir = self.tick_dir(exchange, symbol);
55            fs::create_dir_all(&dir)?;
56            let file_path = dir.join(format!("{date}.parquet"));
57
58            let owned: Vec<Tick> = group_ticks.iter().map(|t| (*t).clone()).collect();
59            let new_df = ticks_to_dataframe(&owned)?;
60
61            if file_path.exists() {
62                let existing_df = read_parquet_file(&file_path)?;
63                let existing_count = existing_df.height();
64                let combined = concat_and_dedup_ticks(existing_df, new_df)?;
65                total_inserted += combined.height().saturating_sub(existing_count);
66                write_parquet_file(&file_path, &mut combined.clone())?;
67            } else {
68                let deduped = dedup_ticks(new_df)?;
69                total_inserted += deduped.height();
70                write_parquet_file(&file_path, &mut deduped.clone())?;
71            }
72        }
73
74        Ok(total_inserted)
75    }
76
77    /// Import bars, deduplicating against existing data per date partition.
78    /// Returns the number of rows actually inserted (after dedup).
79    pub fn insert_bars(&self, bars: &[Bar]) -> Result<usize> {
80        if bars.is_empty() {
81            return Ok(0);
82        }
83
84        // Group bars by (exchange, symbol, timeframe, date)
85        let mut groups: HashMap<(String, String, String, String), Vec<&Bar>> = HashMap::new();
86        for bar in bars {
87            let date = ndt_to_date_string(&bar.ts);
88            let key = (
89                bar.exchange.clone(),
90                bar.symbol.clone(),
91                bar.timeframe.as_str().to_string(),
92                date,
93            );
94            groups.entry(key).or_default().push(bar);
95        }
96
97        let mut total_inserted = 0usize;
98
99        for ((exchange, symbol, timeframe, date), group_bars) in &groups {
100            let dir = self.bar_dir(&exchange, &symbol, &timeframe);
101            fs::create_dir_all(&dir)?;
102            let file_path = dir.join(format!("{date}.parquet"));
103
104            let owned: Vec<Bar> = group_bars.iter().map(|b| (*b).clone()).collect();
105            let new_df = bars_to_dataframe(&owned)?;
106
107            if file_path.exists() {
108                let existing_df = read_parquet_file(&file_path)?;
109                let existing_count = existing_df.height();
110                let combined = concat_and_dedup_bars(existing_df, new_df)?;
111                total_inserted += combined.height().saturating_sub(existing_count);
112                write_parquet_file(&file_path, &mut combined.clone())?;
113            } else {
114                let deduped = dedup_bars(new_df)?;
115                total_inserted += deduped.height();
116                write_parquet_file(&file_path, &mut deduped.clone())?;
117            }
118        }
119
120        Ok(total_inserted)
121    }
122
123    // ── Query ───────────────────────────────────────────────────
124
125    /// Query ticks for a given exchange+symbol, optionally filtered by date range.
126    /// Returns (ticks, total_count_matching_filters).
127    pub fn query_ticks(&self, opts: &QueryOpts) -> Result<(Vec<Tick>, u64)> {
128        let dir = self.tick_dir(&opts.exchange, &opts.symbol);
129        if !dir.exists() {
130            return Ok((Vec::new(), 0));
131        }
132
133        let files = list_date_files(&dir, opts.from, opts.to)?;
134        if files.is_empty() {
135            return Ok((Vec::new(), 0));
136        }
137
138        let mut all_dfs: Vec<DataFrame> = Vec::new();
139        for file in &files {
140            let df = read_parquet_file(file)?;
141            all_dfs.push(df);
142        }
143        let mut combined = concat_dataframes(all_dfs)?;
144
145        // Apply timestamp filters
146        combined = apply_ts_filter(combined, opts.from, opts.to)?;
147
148        // Sort by ts ascending
149        combined = combined.sort(["ts"], SortMultipleOptions::default())?;
150
151        let total = combined.height() as u64;
152
153        // Apply limit/tail/descending
154        combined = apply_pagination(combined, opts.limit, opts.tail, opts.descending)?;
155
156        let ticks = dataframe_to_ticks(&combined)?;
157        Ok((ticks, total))
158    }
159
160    /// Query bars for a given exchange+symbol+timeframe, optionally filtered by date range.
161    /// Returns (bars, total_count_matching_filters).
162    pub fn query_bars(&self, opts: &BarQueryOpts) -> Result<(Vec<Bar>, u64)> {
163        let dir = self.bar_dir(&opts.exchange, &opts.symbol, &opts.timeframe);
164        if !dir.exists() {
165            return Ok((Vec::new(), 0));
166        }
167
168        let files = list_date_files(&dir, opts.from, opts.to)?;
169        if files.is_empty() {
170            return Ok((Vec::new(), 0));
171        }
172
173        let mut all_dfs: Vec<DataFrame> = Vec::new();
174        for file in &files {
175            let df = read_parquet_file(file)?;
176            all_dfs.push(df);
177        }
178        let mut combined = concat_dataframes(all_dfs)?;
179
180        // Apply timestamp filters
181        combined = apply_ts_filter(combined, opts.from, opts.to)?;
182
183        // Sort by ts ascending
184        combined = combined.sort(["ts"], SortMultipleOptions::default())?;
185
186        let total = combined.height() as u64;
187
188        // Apply limit/tail/descending
189        combined = apply_pagination(combined, opts.limit, opts.tail, opts.descending)?;
190
191        let bars = dataframe_to_bars(&combined)?;
192        Ok((bars, total))
193    }
194
195    // ── Delete ──────────────────────────────────────────────────
196
197    /// Delete ticks matching exchange+symbol, optionally within a date range.
198    pub fn delete_ticks(
199        &self,
200        exchange: &str,
201        symbol: &str,
202        from: Option<NaiveDateTime>,
203        to: Option<NaiveDateTime>,
204    ) -> Result<usize> {
205        let dir = self.tick_dir(exchange, symbol);
206        if !dir.exists() {
207            return Ok(0);
208        }
209        delete_from_partition(&dir, from, to)
210    }
211
212    /// Delete bars matching exchange+symbol+timeframe, optionally within a date range.
213    pub fn delete_bars(
214        &self,
215        exchange: &str,
216        symbol: &str,
217        timeframe: &str,
218        from: Option<NaiveDateTime>,
219        to: Option<NaiveDateTime>,
220    ) -> Result<usize> {
221        let dir = self.bar_dir(exchange, symbol, timeframe);
222        if !dir.exists() {
223            return Ok(0);
224        }
225        delete_from_partition(&dir, from, to)
226    }
227
228    /// Delete ALL data (ticks + bars) for an exchange+symbol pair.
229    pub fn delete_symbol(&self, exchange: &str, symbol: &str) -> Result<(usize, usize)> {
230        let tick_count = self.count_rows_in_dir(&self.tick_dir(exchange, symbol));
231        let bar_count = self.count_all_bars_for_symbol(exchange, symbol);
232
233        // Remove tick directory
234        let tick_dir = self.tick_dir(exchange, symbol);
235        if tick_dir.exists() {
236            fs::remove_dir_all(&tick_dir)?;
237        }
238
239        // Remove bar directories for all timeframes
240        let bar_sym_dir = self
241            .root
242            .join("bars")
243            .join(format!("exchange={exchange}"))
244            .join(format!("symbol={symbol}"));
245        if bar_sym_dir.exists() {
246            fs::remove_dir_all(&bar_sym_dir)?;
247        }
248
249        Ok((tick_count, bar_count))
250    }
251
252    /// Delete ALL data for an entire exchange.
253    pub fn delete_exchange(&self, exchange: &str) -> Result<(usize, usize)> {
254        let tick_ex_dir = self.root.join("ticks").join(format!("exchange={exchange}"));
255        let bar_ex_dir = self.root.join("bars").join(format!("exchange={exchange}"));
256
257        let tick_count = self.count_rows_recursive(&tick_ex_dir);
258        let bar_count = self.count_rows_recursive(&bar_ex_dir);
259
260        if tick_ex_dir.exists() {
261            fs::remove_dir_all(&tick_ex_dir)?;
262        }
263        if bar_ex_dir.exists() {
264            fs::remove_dir_all(&bar_ex_dir)?;
265        }
266
267        Ok((tick_count, bar_count))
268    }
269
270    // ── Stats ───────────────────────────────────────────────────
271
272    /// Summary statistics across all data, optionally filtered by exchange and/or symbol.
273    pub fn stats(&self, exchange: Option<&str>, symbol: Option<&str>) -> Result<Vec<StatRow>> {
274        let mut rows = Vec::new();
275
276        // Collect tick stats
277        self.collect_tick_stats(&mut rows, exchange, symbol)?;
278
279        // Collect bar stats
280        self.collect_bar_stats(&mut rows, exchange, symbol)?;
281
282        // Sort by exchange, symbol, data_type
283        rows.sort_by(|a, b| {
284            a.exchange
285                .cmp(&b.exchange)
286                .then(a.symbol.cmp(&b.symbol))
287                .then(a.data_type.cmp(&b.data_type))
288        });
289
290        Ok(rows)
291    }
292
293    /// Total size of all Parquet files under the data root (bytes).
294    pub fn total_size(&self) -> Option<u64> {
295        let mut total = 0u64;
296        for entry in walkdir(&self.root) {
297            if entry.extension().map_or(false, |e| e == "parquet") {
298                if let Ok(meta) = fs::metadata(&entry) {
299                    total += meta.len();
300                }
301            }
302        }
303        if total == 0 { None } else { Some(total) }
304    }
305
306    // ── Private helpers ─────────────────────────────────────────
307
308    /// Build tick directory path for a given exchange+symbol.
309    fn tick_dir(&self, exchange: &str, symbol: &str) -> PathBuf {
310        self.root
311            .join("ticks")
312            .join(format!("exchange={exchange}"))
313            .join(format!("symbol={symbol}"))
314    }
315
316    /// Build bar directory path for a given exchange+symbol+timeframe.
317    fn bar_dir(&self, exchange: &str, symbol: &str, timeframe: &str) -> PathBuf {
318        self.root
319            .join("bars")
320            .join(format!("exchange={exchange}"))
321            .join(format!("symbol={symbol}"))
322            .join(format!("timeframe={timeframe}"))
323    }
324
325    /// Count total rows across all parquet files in a directory.
326    fn count_rows_in_dir(&self, dir: &Path) -> usize {
327        if !dir.exists() {
328            return 0;
329        }
330        let mut count = 0;
331        if let Ok(entries) = fs::read_dir(dir) {
332            for entry in entries.flatten() {
333                let path = entry.path();
334                if path.extension().map_or(false, |e| e == "parquet") {
335                    if let Ok(df) = read_parquet_file(&path) {
336                        count += df.height();
337                    }
338                }
339            }
340        }
341        count
342    }
343
344    /// Count total rows recursively across all parquet files under a directory.
345    fn count_rows_recursive(&self, dir: &Path) -> usize {
346        if !dir.exists() {
347            return 0;
348        }
349        let mut count = 0;
350        for path in walkdir(dir) {
351            if path.extension().map_or(false, |e| e == "parquet") {
352                if let Ok(df) = read_parquet_file(&path) {
353                    count += df.height();
354                }
355            }
356        }
357        count
358    }
359
360    /// Count all bar rows for a given exchange+symbol across all timeframes.
361    fn count_all_bars_for_symbol(&self, exchange: &str, symbol: &str) -> usize {
362        let bar_sym_dir = self
363            .root
364            .join("bars")
365            .join(format!("exchange={exchange}"))
366            .join(format!("symbol={symbol}"));
367        self.count_rows_recursive(&bar_sym_dir)
368    }
369
370    /// Collect tick stats from the directory tree.
371    fn collect_tick_stats(
372        &self,
373        rows: &mut Vec<StatRow>,
374        exchange_filter: Option<&str>,
375        symbol_filter: Option<&str>,
376    ) -> Result<()> {
377        let ticks_dir = self.root.join("ticks");
378        if !ticks_dir.exists() {
379            return Ok(());
380        }
381
382        for (exchange, symbol, dir) in self.iter_exchange_symbol_dirs(&ticks_dir)? {
383            if let Some(ef) = exchange_filter {
384                if exchange != ef {
385                    continue;
386                }
387            }
388            if let Some(sf) = symbol_filter {
389                if symbol != sf {
390                    continue;
391                }
392            }
393
394            let (count, ts_min, ts_max) = self.aggregate_parquet_stats(&dir)?;
395            if count > 0 {
396                rows.push(StatRow {
397                    exchange,
398                    symbol,
399                    data_type: "tick".to_string(),
400                    count,
401                    ts_min: ts_min.unwrap_or_default(),
402                    ts_max: ts_max.unwrap_or_default(),
403                });
404            }
405        }
406
407        Ok(())
408    }
409
410    /// Collect bar stats from the directory tree.
411    fn collect_bar_stats(
412        &self,
413        rows: &mut Vec<StatRow>,
414        exchange_filter: Option<&str>,
415        symbol_filter: Option<&str>,
416    ) -> Result<()> {
417        let bars_dir = self.root.join("bars");
418        if !bars_dir.exists() {
419            return Ok(());
420        }
421
422        for (exchange, symbol, timeframe, dir) in self.iter_exchange_symbol_tf_dirs(&bars_dir)? {
423            if let Some(ef) = exchange_filter {
424                if exchange != ef {
425                    continue;
426                }
427            }
428            if let Some(sf) = symbol_filter {
429                if symbol != sf {
430                    continue;
431                }
432            }
433
434            let (count, ts_min, ts_max) = self.aggregate_parquet_stats(&dir)?;
435            if count > 0 {
436                rows.push(StatRow {
437                    exchange,
438                    symbol,
439                    data_type: format!("bar ({timeframe})"),
440                    count,
441                    ts_min: ts_min.unwrap_or_default(),
442                    ts_max: ts_max.unwrap_or_default(),
443                });
444            }
445        }
446
447        Ok(())
448    }
449
450    /// Iterate over exchange/symbol directories under a top-level dir.
451    fn iter_exchange_symbol_dirs(&self, base: &Path) -> Result<Vec<(String, String, PathBuf)>> {
452        let mut result = Vec::new();
453        if !base.exists() {
454            return Ok(result);
455        }
456
457        for ex_entry in fs::read_dir(base)?.flatten() {
458            let ex_path = ex_entry.path();
459            if !ex_path.is_dir() {
460                continue;
461            }
462            let exchange =
463                parse_partition_value(ex_path.file_name().unwrap().to_str().unwrap_or(""));
464            if exchange.is_empty() {
465                continue;
466            }
467
468            for sym_entry in fs::read_dir(&ex_path)?.flatten() {
469                let sym_path = sym_entry.path();
470                if !sym_path.is_dir() {
471                    continue;
472                }
473                let symbol =
474                    parse_partition_value(sym_path.file_name().unwrap().to_str().unwrap_or(""));
475                if symbol.is_empty() {
476                    continue;
477                }
478                result.push((exchange.clone(), symbol, sym_path));
479            }
480        }
481
482        Ok(result)
483    }
484
485    /// Iterate over exchange/symbol/timeframe directories under a top-level dir.
486    fn iter_exchange_symbol_tf_dirs(
487        &self,
488        base: &Path,
489    ) -> Result<Vec<(String, String, String, PathBuf)>> {
490        let mut result = Vec::new();
491        if !base.exists() {
492            return Ok(result);
493        }
494
495        for ex_entry in fs::read_dir(base)?.flatten() {
496            let ex_path = ex_entry.path();
497            if !ex_path.is_dir() {
498                continue;
499            }
500            let exchange =
501                parse_partition_value(ex_path.file_name().unwrap().to_str().unwrap_or(""));
502            if exchange.is_empty() {
503                continue;
504            }
505
506            for sym_entry in fs::read_dir(&ex_path)?.flatten() {
507                let sym_path = sym_entry.path();
508                if !sym_path.is_dir() {
509                    continue;
510                }
511                let symbol =
512                    parse_partition_value(sym_path.file_name().unwrap().to_str().unwrap_or(""));
513                if symbol.is_empty() {
514                    continue;
515                }
516
517                for tf_entry in fs::read_dir(&sym_path)?.flatten() {
518                    let tf_path = tf_entry.path();
519                    if !tf_path.is_dir() {
520                        continue;
521                    }
522                    let timeframe =
523                        parse_partition_value(tf_path.file_name().unwrap().to_str().unwrap_or(""));
524                    if timeframe.is_empty() {
525                        continue;
526                    }
527                    result.push((exchange.clone(), symbol.clone(), timeframe, tf_path));
528                }
529            }
530        }
531
532        Ok(result)
533    }
534
535    /// Read all parquet files in a directory and aggregate row count + min/max ts.
536    fn aggregate_parquet_stats(
537        &self,
538        dir: &Path,
539    ) -> Result<(u64, Option<NaiveDateTime>, Option<NaiveDateTime>)> {
540        let mut total_count = 0u64;
541        let mut global_min: Option<i64> = None;
542        let mut global_max: Option<i64> = None;
543
544        if !dir.exists() {
545            return Ok((0, None, None));
546        }
547
548        for entry in fs::read_dir(dir)?.flatten() {
549            let path = entry.path();
550            if path.extension().map_or(false, |e| e == "parquet") {
551                let df = read_parquet_file(&path)?;
552                total_count += df.height() as u64;
553
554                if df.height() > 0 {
555                    let ts_col = df.column("ts").ok().and_then(|c| c.datetime().ok());
556                    if let Some(ts) = ts_col {
557                        if let Some(min_val) = ts.min() {
558                            global_min =
559                                Some(global_min.map_or(min_val, |cur: i64| cur.min(min_val)));
560                        }
561                        if let Some(max_val) = ts.max() {
562                            global_max =
563                                Some(global_max.map_or(max_val, |cur: i64| cur.max(max_val)));
564                        }
565                    }
566                }
567            }
568        }
569
570        let ts_min = global_min.map(micros_to_ndt);
571        let ts_max = global_max.map(micros_to_ndt);
572
573        Ok((total_count, ts_min, ts_max))
574    }
575}
576
577// ── Free functions ──────────────────────────────────────────────
578
579/// Parse a Hive partition value from a directory name like "exchange=ctrader".
580fn parse_partition_value(dir_name: &str) -> String {
581    dir_name
582        .split_once('=')
583        .map(|(_, v)| v.to_string())
584        .unwrap_or_default()
585}
586
587/// List parquet files in a directory, optionally filtered by date range in filename.
588fn list_date_files(
589    dir: &Path,
590    from: Option<NaiveDateTime>,
591    to: Option<NaiveDateTime>,
592) -> Result<Vec<PathBuf>> {
593    let mut files = Vec::new();
594    let from_date = from.map(|d| d.format("%Y-%m-%d").to_string());
595    let to_date = to.map(|d| d.format("%Y-%m-%d").to_string());
596
597    for entry in fs::read_dir(dir)?.flatten() {
598        let path = entry.path();
599        if path.extension().map_or(false, |e| e == "parquet") {
600            let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
601
602            // Filename-level date pruning
603            let dominated_by_from = from_date.as_ref().map_or(false, |fd| stem < fd.as_str());
604            let past_to = to_date.as_ref().map_or(false, |td| stem > td.as_str());
605
606            if !dominated_by_from && !past_to {
607                files.push(path);
608            }
609        }
610    }
611
612    files.sort();
613    Ok(files)
614}
615
616/// Read a single Parquet file into a DataFrame.
617fn read_parquet_file(path: &Path) -> Result<DataFrame> {
618    let file = std::fs::File::open(path)?;
619    let df = ParquetReader::new(file).finish()?;
620    Ok(df)
621}
622
623/// Write a DataFrame to a Parquet file with zstd compression.
624fn write_parquet_file(path: &Path, df: &mut DataFrame) -> Result<()> {
625    let file = std::fs::File::create(path)?;
626    ParquetWriter::new(file)
627        .with_compression(ParquetCompression::Zstd(None))
628        .finish(df)?;
629    Ok(())
630}
631
632/// Concat two tick DataFrames, dedup on (exchange, symbol, ts), sort by ts.
633fn concat_and_dedup_ticks(existing: DataFrame, new: DataFrame) -> Result<DataFrame> {
634    let combined = concat_dataframes(vec![existing, new])?;
635    dedup_ticks(combined)
636}
637
638/// Dedup a tick DataFrame on (exchange, symbol, ts) keeping first, sort by ts.
639fn dedup_ticks(df: DataFrame) -> Result<DataFrame> {
640    let cols: Vec<String> = vec!["exchange".into(), "symbol".into(), "ts".into()];
641    let deduped = df
642        .unique_stable(Some(&cols), UniqueKeepStrategy::First, None)?
643        .sort(["ts"], SortMultipleOptions::default())?;
644    Ok(deduped)
645}
646
647/// Concat two bar DataFrames, dedup on (exchange, symbol, timeframe, ts), sort by ts.
648fn concat_and_dedup_bars(existing: DataFrame, new: DataFrame) -> Result<DataFrame> {
649    let combined = concat_dataframes(vec![existing, new])?;
650    dedup_bars(combined)
651}
652
653/// Dedup a bar DataFrame on (exchange, symbol, timeframe, ts) keeping first, sort by ts.
654fn dedup_bars(df: DataFrame) -> Result<DataFrame> {
655    let cols: Vec<String> = vec![
656        "exchange".into(),
657        "symbol".into(),
658        "timeframe".into(),
659        "ts".into(),
660    ];
661    let deduped = df
662        .unique_stable(Some(&cols), UniqueKeepStrategy::First, None)?
663        .sort(["ts"], SortMultipleOptions::default())?;
664    Ok(deduped)
665}
666
667/// Vertically concatenate multiple DataFrames.
668fn concat_dataframes(dfs: Vec<DataFrame>) -> Result<DataFrame> {
669    if dfs.is_empty() {
670        return Err(DataError::Other("no dataframes to concat".into()));
671    }
672    if dfs.len() == 1 {
673        return Ok(dfs.into_iter().next().unwrap());
674    }
675    let lazy_frames: Vec<LazyFrame> = dfs.into_iter().map(|df| df.lazy()).collect();
676    let combined = polars::prelude::concat(lazy_frames, Default::default())?.collect()?;
677    Ok(combined)
678}
679
680/// Apply timestamp range filter to a DataFrame with a "ts" datetime column.
681fn apply_ts_filter(
682    df: DataFrame,
683    from: Option<NaiveDateTime>,
684    to: Option<NaiveDateTime>,
685) -> Result<DataFrame> {
686    if from.is_none() && to.is_none() {
687        return Ok(df);
688    }
689
690    let mut lf = df.lazy();
691
692    if let Some(f) = from {
693        let from_micros = f.and_utc().timestamp_micros();
694        lf = lf.filter(
695            col("ts")
696                .gt_eq(lit(from_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None))),
697        );
698    }
699    if let Some(t) = to {
700        let to_micros = t.and_utc().timestamp_micros();
701        lf = lf.filter(
702            col("ts").lt_eq(lit(to_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None))),
703        );
704    }
705
706    Ok(lf.collect()?)
707}
708
709/// Apply limit, tail, and descending pagination to a sorted DataFrame.
710fn apply_pagination(
711    df: DataFrame,
712    limit: usize,
713    tail: bool,
714    descending: bool,
715) -> Result<DataFrame> {
716    let result = if tail {
717        // Take last N rows, then optionally reverse for descending
718        let n = limit.min(df.height());
719        let tailed = df.tail(Some(n));
720        if descending {
721            tailed.sort(
722                ["ts"],
723                SortMultipleOptions::default().with_order_descending(true),
724            )?
725        } else {
726            tailed
727        }
728    } else if descending {
729        // Take first N from descending sort
730        let sorted = df.sort(
731            ["ts"],
732            SortMultipleOptions::default().with_order_descending(true),
733        )?;
734        sorted.head(Some(limit))
735    } else {
736        df.head(Some(limit))
737    };
738    Ok(result)
739}
740
741/// Delete rows from a date-partitioned directory, optionally within a date range.
742fn delete_from_partition(
743    dir: &Path,
744    from: Option<NaiveDateTime>,
745    to: Option<NaiveDateTime>,
746) -> Result<usize> {
747    if from.is_none() && to.is_none() {
748        // Delete everything in the directory
749        let count = count_all_rows_in_dir(dir);
750        // Remove all parquet files but keep the directory
751        for entry in fs::read_dir(dir)?.flatten() {
752            let path = entry.path();
753            if path.extension().map_or(false, |e| e == "parquet") {
754                fs::remove_file(&path)?;
755            }
756        }
757        return Ok(count);
758    }
759
760    let files = list_date_files(dir, from, to)?;
761    let mut total_deleted = 0usize;
762
763    for file_path in &files {
764        let df = read_parquet_file(file_path)?;
765        let original_count = df.height();
766
767        // Filter to keep rows OUTSIDE the delete range
768        let filtered = apply_ts_filter_inverted(df, from, to)?;
769
770        if filtered.height() == 0 {
771            // All rows deleted — remove the file
772            fs::remove_file(file_path)?;
773            total_deleted += original_count;
774        } else if filtered.height() < original_count {
775            // Partial deletion — rewrite the file
776            total_deleted += original_count - filtered.height();
777            write_parquet_file(file_path, &mut filtered.clone())?;
778        }
779        // else: no rows matched the range in this file
780    }
781
782    Ok(total_deleted)
783}
784
785/// Filter to keep rows OUTSIDE a timestamp range (inverse of apply_ts_filter).
786fn apply_ts_filter_inverted(
787    df: DataFrame,
788    from: Option<NaiveDateTime>,
789    to: Option<NaiveDateTime>,
790) -> Result<DataFrame> {
791    let mut lf = df.lazy();
792
793    match (from, to) {
794        (Some(f), Some(t)) => {
795            let from_micros = f.and_utc().timestamp_micros();
796            let to_micros = t.and_utc().timestamp_micros();
797            let from_lit = lit(from_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None));
798            let to_lit = lit(to_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None));
799            // Keep rows where ts < from OR ts > to
800            lf = lf.filter(col("ts").lt(from_lit).or(col("ts").gt(to_lit)));
801        }
802        (Some(f), None) => {
803            let from_micros = f.and_utc().timestamp_micros();
804            let from_lit = lit(from_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None));
805            lf = lf.filter(col("ts").lt(from_lit));
806        }
807        (None, Some(t)) => {
808            let to_micros = t.and_utc().timestamp_micros();
809            let to_lit = lit(to_micros).cast(DataType::Datetime(TimeUnit::Microseconds, None));
810            lf = lf.filter(col("ts").gt(to_lit));
811        }
812        (None, None) => {}
813    }
814
815    Ok(lf.collect()?)
816}
817
818/// Count all rows across parquet files in a directory (non-recursive).
819fn count_all_rows_in_dir(dir: &Path) -> usize {
820    let mut count = 0;
821    if let Ok(entries) = fs::read_dir(dir) {
822        for entry in entries.flatten() {
823            let path = entry.path();
824            if path.extension().map_or(false, |e| e == "parquet") {
825                if let Ok(df) = read_parquet_file(&path) {
826                    count += df.height();
827                }
828            }
829        }
830    }
831    count
832}
833
834/// Recursively walk a directory and collect all file paths.
835fn walkdir(dir: &Path) -> Vec<PathBuf> {
836    let mut result = Vec::new();
837    if !dir.exists() {
838        return result;
839    }
840    if let Ok(entries) = fs::read_dir(dir) {
841        for entry in entries.flatten() {
842            let path = entry.path();
843            if path.is_dir() {
844                result.extend(walkdir(&path));
845            } else {
846                result.push(path);
847            }
848        }
849    }
850    result
851}
852
853/// Convert microsecond epoch to NaiveDateTime.
854fn micros_to_ndt(micros: i64) -> NaiveDateTime {
855    let secs = micros / 1_000_000;
856    let nsecs = ((micros % 1_000_000) * 1_000) as u32;
857    chrono::DateTime::from_timestamp(secs, nsecs)
858        .map(|dt| dt.naive_utc())
859        .unwrap_or_default()
860}