atelier_data 0.0.15

Data Artifacts and I/O for the atelier-rs engine
//! Multi-file parquet loader with chronological concatenation.
//!
//! Parquet files produced by `atelier-data` are segmented by time windows
//! (typically ~100s each). This module scans a directory, sorts files
//! chronologically by filename, loads each into memory, and concatenates
//! into a single contiguous time series.

use std::path::Path;

use crate::{
    errors::LoaderError,
    orderbooks::{Orderbook, io::ob_parquet::load_parquet_to_ob},
    trades::{Trade, io::trades_parquet::read_trades_parquet},
};

/// Load and concatenate all orderbook parquet files from a directory.
///
/// Files are discovered by glob pattern `*.parquet`, sorted lexicographically
/// by filename (which embeds a timestamp), loaded sequentially, and the
/// resulting snapshots are sorted by `orderbook_ts` to ensure global ordering.
///
/// # Arguments
///
/// * `dir` - Path to the directory containing orderbook parquet files.
///
/// # Errors
///
/// Returns `ArrivalsError::IoError` if the directory cannot be read or
/// if any parquet file fails to load.
///
/// # Examples
///
/// ```no_run
/// use std::path::Path;
/// use atelier_data::datasets::loaders::load_orderbooks_from_dir;
///
/// let obs = load_orderbooks_from_dir(Path::new("datasets/collected/bybit/ob")).unwrap();
/// println!("Loaded {} orderbook snapshots", obs.len());
/// ```
pub fn load_orderbooks_from_dir(dir: &Path) -> Result<Vec<Orderbook>, LoaderError> {
    let mut paths = collect_parquet_paths(dir)?;
    paths.sort();

    let mut all_obs: Vec<Orderbook> = Vec::new();

    for path in &paths {
        let obs = load_parquet_to_ob(path).map_err(|e| {
            LoaderError::IoError(format!("Failed to load {}: {}", path.display(), e))
        })?;
        all_obs.extend(obs);
    }

    // Global sort by timestamp to handle any cross-file ordering issues
    all_obs.sort_by_key(|ob| ob.orderbook_ts);

    if all_obs.is_empty() {
        return Err(LoaderError::EmptyData);
    }

    Ok(all_obs)
}

/// Load and concatenate all trade parquet files from a directory.
///
/// Same strategy as `load_orderbooks_from_dir` but for `Trade` records.
/// Final vector is sorted by `trade_ts`.
///
/// # Arguments
///
/// * `dir` - Path to the directory containing trade parquet files.
///
/// # Errors
///
/// Returns `ArrivalsError::IoError` if the directory cannot be read or
/// if any parquet file fails to load.
pub fn load_trades_from_dir(dir: &Path) -> Result<Vec<Trade>, LoaderError> {
    let mut paths = collect_parquet_paths(dir)?;
    paths.sort();

    let mut all_trades: Vec<Trade> = Vec::new();

    for path in &paths {
        let trades = read_trades_parquet(path).map_err(|e| {
            LoaderError::IoError(format!("Failed to load {}: {}", path.display(), e))
        })?;
        all_trades.extend(trades);
    }

    // Global sort by timestamp
    all_trades.sort_by_key(|t| t.trade_ts);

    if all_trades.is_empty() {
        return Err(LoaderError::EmptyData);
    }

    Ok(all_trades)
}

/// Collect all `.parquet` file paths from a directory (non-recursive).
fn collect_parquet_paths(dir: &Path) -> Result<Vec<std::path::PathBuf>, LoaderError> {
    if !dir.is_dir() {
        return Err(LoaderError::IoError(format!(
            "Not a directory: {}",
            dir.display()
        )));
    }

    let entries = std::fs::read_dir(dir).map_err(|e| {
        LoaderError::IoError(format!("Cannot read directory {}: {}", dir.display(), e))
    })?;

    let mut paths = Vec::new();
    for entry in entries {
        let entry = entry
            .map_err(|e| LoaderError::IoError(format!("Directory entry error: {}", e)))?;
        let path = entry.path();
        if path.is_file() {
            if let Some(ext) = path.extension() {
                if ext == "parquet" {
                    paths.push(path);
                }
            }
        }
    }

    Ok(paths)
}