atelier_data 0.0.15

Data Artifacts and I/O for the atelier-rs engine
//! Parquet I/O for [`Liquidation`] data.
//!
//! Provides write and read functions for persisting liquidation data in Apache
//! Parquet columnar format with Snappy compression.
//!
//! # Feature Flag
//!
//! Requires the `parquet` feature:
//!
//! ```toml
//! [dependencies]
//! atelier_data = { version = "...", features = ["parquet"] }
//! ```

use crate::errors::PersistError;
use crate::liquidations::Liquidation;
use std::path::Path;

/// Write a batch of [`Liquidation`] records to a Parquet file with Snappy compression.
///
/// # Output Schema
///
/// | Column | Arrow Type | Description |
/// |--------|------------|-------------|
/// | `liquidation_ts` | `UInt64` | Unix timestamp in milliseconds |
/// | `symbol` | `Utf8` | Trading pair symbol |
/// | `side` | `Utf8` | "Buy" or "Sell" |
/// | `price` | `Float64` | Liquidation price |
/// | `amount` | `Float64` | Liquidation quantity |
/// | `exchange` | `Utf8` | Exchange name |
#[cfg(feature = "parquet")]
pub fn write_liquidations_parquet(
    liquidations: &[Liquidation],
    path: &Path,
) -> Result<(), PersistError> {
    use arrow::{
        array::{Float64Array, StringArray, UInt64Array},
        datatypes::{DataType, Field, Schema},
        record_batch::RecordBatch,
    };
    use parquet::{
        arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties,
    };
    use std::{fs::File, sync::Arc};

    let n = liquidations.len();

    let mut timestamps = Vec::with_capacity(n);
    let mut symbols: Vec<&str> = Vec::with_capacity(n);
    let mut sides: Vec<&str> = Vec::with_capacity(n);
    let mut prices = Vec::with_capacity(n);
    let mut amounts = Vec::with_capacity(n);
    let mut exchanges: Vec<&str> = Vec::with_capacity(n);

    for liq in liquidations {
        timestamps.push(liq.liquidation_ts);
        symbols.push(&liq.symbol);
        sides.push(&liq.side);
        prices.push(liq.price);
        amounts.push(liq.amount);
        exchanges.push(&liq.exchange);
    }

    let schema = Schema::new(vec![
        Field::new("liquidation_ts", DataType::UInt64, false),
        Field::new("symbol", DataType::Utf8, false),
        Field::new("side", DataType::Utf8, false),
        Field::new("price", DataType::Float64, false),
        Field::new("amount", DataType::Float64, false),
        Field::new("exchange", DataType::Utf8, false),
    ]);

    let batch = RecordBatch::try_new(
        Arc::new(schema),
        vec![
            Arc::new(UInt64Array::from(timestamps)),
            Arc::new(StringArray::from(symbols)),
            Arc::new(StringArray::from(sides)),
            Arc::new(Float64Array::from(prices)),
            Arc::new(Float64Array::from(amounts)),
            Arc::new(StringArray::from(exchanges)),
        ],
    )?;

    let file = File::create(path)?;
    let props = WriterProperties::builder()
        .set_compression(Compression::SNAPPY)
        .build();
    let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
    writer.write(&batch)?;
    writer.close()?;

    Ok(())
}

/// Stub for when the `parquet` feature is not enabled.
#[cfg(not(feature = "parquet"))]
pub fn write_liquidations_parquet(
    _liquidations: &[Liquidation],
    _path: &Path,
) -> Result<(), PersistError> {
    Err(PersistError::UnsupportedFormat(
        "parquet support not compiled in (enable 'parquet' feature)".to_string(),
    ))
}

/// Read [`Liquidation`] records from a Parquet file.
#[cfg(feature = "parquet")]
pub fn read_liquidations_parquet(path: &Path) -> Result<Vec<Liquidation>, PersistError> {
    use arrow::array::AsArray;
    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
    use std::fs::File;

    let file = File::open(path)?;
    let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
    let reader = builder.build()?;

    let mut liquidations = Vec::new();

    for batch_result in reader {
        let batch = batch_result?;

        let timestamps = batch
            .column(0)
            .as_primitive::<arrow::datatypes::UInt64Type>();
        let symbols = batch.column(1).as_string::<i32>();
        let sides = batch.column(2).as_string::<i32>();
        let prices = batch
            .column(3)
            .as_primitive::<arrow::datatypes::Float64Type>();
        let amounts = batch
            .column(4)
            .as_primitive::<arrow::datatypes::Float64Type>();
        let exchanges = batch.column(5).as_string::<i32>();

        for i in 0..batch.num_rows() {
            liquidations.push(Liquidation {
                liquidation_ts: timestamps.value(i),
                symbol: symbols.value(i).to_string(),
                side: sides.value(i).to_string(),
                price: prices.value(i),
                amount: amounts.value(i),
                exchange: exchanges.value(i).to_string(),
            });
        }
    }

    Ok(liquidations)
}

/// Stub for when the `parquet` feature is not enabled.
#[cfg(not(feature = "parquet"))]
pub fn read_liquidations_parquet(_path: &Path) -> Result<Vec<Liquidation>, PersistError> {
    Err(PersistError::UnsupportedFormat(
        "parquet support not compiled in (enable 'parquet' feature)".to_string(),
    ))
}

/// Write a batch of liquidations to a timestamped Parquet file in `output_dir`.
///
/// # Filename convention
///
/// Output file: `{SYMBOL}_liquidations_{MODE}_{TIMESTAMP}.parquet`
///
/// The symbol is sanitised for filesystem safety (`/` → `-`), so Kraken's
/// `BTC/USDT` becomes `BTC-USDT` in the filename while the Parquet data
/// retains the original symbol string.
///
/// Examples: `BTCUSDT_liquidations_sync_20260226_153000.123.parquet`,
///           `BTC-USDT_liquidations_sync_20260226_153000.123.parquet`
///
/// # Arguments
///
/// * `liquidations` — Liquidation records to write (must not be empty).
/// * `output_dir` — Target directory (must exist).
/// * `mode` — Tag embedded in the filename, typically `"sync"` for
///   grid-aligned data or `"raw"` for unprocessed captures.
///
/// Returns the full path to the written file.
#[cfg(feature = "parquet")]
pub fn write_liquidations_parquet_timestamped(
    liquidations: &[Liquidation],
    output_dir: &Path,
    mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
    let file_ts = chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f");

    let raw_symbol = liquidations
        .first()
        .map(|l| l.symbol.as_str())
        .unwrap_or("unknown");
    let exchange = liquidations
        .first()
        .map(|l| l.exchange.as_str())
        .unwrap_or("unknown");
    let symbol = raw_symbol.replace('/', "-");

    let filename = format!(
        "{}_{}_liquidations_{}_{}.parquet",
        exchange, symbol, mode, file_ts
    );
    let path = output_dir.join(filename);

    write_liquidations_parquet(liquidations, &path)?;
    Ok(path)
}

/// Stub for when the `parquet` feature is not enabled.
#[cfg(not(feature = "parquet"))]
pub fn write_liquidations_parquet_timestamped(
    _liquidations: &[Liquidation],
    _output_dir: &Path,
    _mode: &str,
) -> Result<std::path::PathBuf, PersistError> {
    Err(PersistError::UnsupportedFormat(
        "parquet support not compiled in (enable 'parquet' feature)".to_string(),
    ))
}