lbl-core 0.1.0

lbl is a toolkit for managing address labels
Documentation
use crate::types::*;
use polars::prelude::*;
use std::collections::HashMap;
use std::path::PathBuf;

/// convert address column to hex
pub fn address_to_hex(data: DataFrame) -> Result<DataFrame, LblError> {
    let address_column = data.column("address")?;
    if let DataType::Binary = address_column.dtype() {
        let address = data.column("address")?.binary()?.hex_encode();
        let address = address.str()?;
        let prefix: Series = vec!["0x".to_string(); address.len()].into_iter().collect();
        let prefix = prefix.with_name("address");
        let prefix = prefix.str()?;
        let data = data.clone().with_column(prefix.concat(address))?.clone();
        Ok(data)
    } else {
        Ok(data)
    }
}

/// convert address column to binary
pub fn address_to_binary(data: DataFrame) -> Result<DataFrame, LblError> {
    let mut data = data.clone();
    let address_column = data.column("address")?;
    if let DataType::String = address_column.dtype() {
        let height = address_column.len();
        let offset: Series = vec![2; height].iter().collect();
        let length: Series = vec![42; height].iter().collect();
        let trimmed_prefix = address_column.str()?.str_slice(&offset, &length)?;
        data = data.with_column(trimmed_prefix.hex_decode(true)?)?.clone();
        Ok(data)
    } else {
        Ok(data)
    }
}

pub(crate) fn get_standard_columns() -> HashMap<String, DataType> {
    [
        ("address", DataType::Binary),
        ("collection", DataType::String),
        ("name", DataType::String),
        ("project", DataType::String),
        ("class", DataType::String),
        ("network", DataType::String),
        ("extra_data", DataType::String),
        ("added_by", DataType::String),
        ("date_added", DataType::String),
    ]
    .into_iter()
    .map(|(s, dt)| (s.to_string(), dt))
    .collect()
}

pub(crate) fn get_standard_column_order() -> Vec<String> {
    vec![
        "address".to_string(),
        "collection".to_string(),
        "name".to_string(),
        "project".to_string(),
        "class".to_string(),
        "network".to_string(),
        "extra_data".to_string(),
        "added_by".to_string(),
        "date_added".to_string(),
    ]
}

/// standardize raw collection data
pub fn standardize_collection(
    df: DataFrame,
    metadata: &CollectionData,
) -> Result<DataFrame, LblError> {
    let mut df = df.clone();
    if let (Err(_), Some(collection)) = (df.column("collection"), metadata.collection.clone()) {
        let column: Series = vec![collection.clone(); df.height()].into_iter().collect();
        df = df.with_column(column.with_name("collection"))?.clone();
    };

    let df_columns = df.get_column_names();
    let mut df = df.clone();
    let standard_columns = get_standard_columns();

    // add missing columns
    for (column, dtype) in standard_columns.iter() {
        if !df_columns.contains(&column.as_str()) {
            let series = create_null_column(column.clone(), dtype.clone(), df.height());
            df = df.with_column(series)?.clone();
        }
    }

    // remove extra columns
    for column in df_columns {
        if !standard_columns.contains_key(column) {
            df = df.drop(column)?;
        }
    }

    // convert address column to binary
    let address_column = df.column("address")?;
    if let DataType::String = address_column.dtype() {
        let height = address_column.len();
        let offset: Series = vec![2; height].iter().collect();
        let length: Series = vec![42; height].iter().collect();
        let trimmed_prefix = address_column.str()?.str_slice(&offset, &length)?;
        df = df.with_column(trimmed_prefix.hex_decode(true)?)?.clone();
    };

    // reorder columns
    // let columns: Vec<String> = standard_columns.into_keys().collect();
    let columns = get_standard_column_order();
    let df = df.select(columns)?;

    Ok(df)
}

fn create_null_column(name: String, dtype: DataType, len: usize) -> Series {
    match dtype {
        DataType::String => Series::new(
            name.as_str(),
            std::iter::repeat(None::<&str>)
                .take(len)
                .collect::<Vec<Option<&str>>>()
                .as_slice(),
        ),
        DataType::Binary => Series::new(
            name.as_str(),
            std::iter::repeat(None::<&[u8]>)
                .take(len)
                .collect::<Vec<Option<&[u8]>>>()
                .as_slice(),
        ),
        _ => unimplemented!("Unsupported data type"),
    }
}

/// get row counts across paths
pub fn get_row_counts(paths: Vec<PathBuf>) -> Result<i64, LblError> {
    let paths_by_extension = crate::filesystem::paths_by_extension(paths);
    let mut row_count = 0;
    for (extension, extension_paths) in paths_by_extension.into_iter() {
        match extension.as_str() {
            "csv" => {
                let arc_vec: Arc<Vec<PathBuf>> = Arc::new(extension_paths);
                let arc_slice: Arc<[PathBuf]> =
                    Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
                let lf = LazyCsvReader::new_paths(arc_slice)
                    .finish()?
                    .select([col("address")])
                    .count()
                    .select([col("address").cast(DataType::Int64)])
                    .collect()?;
                row_count += lf.column("address")?.i64()?.get(0).unwrap_or(0);
            }
            "parquet" => {
                let arc_vec: Arc<Vec<PathBuf>> = Arc::new(extension_paths);
                let arc_slice: Arc<[PathBuf]> =
                    Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
                let opts = ScanArgsParquet::default();
                let lf = LazyFrame::scan_parquet_files(arc_slice, opts)?
                    .select([col("address")])
                    .count()
                    .select([col("address").cast(DataType::Int64)])
                    .collect()?;
                row_count += lf.column("address")?.i64()?.get(0).unwrap_or(0);
            }
            other => {
                return Err(LblError::LblError(format!(
                    "Unknown file extension: {}",
                    other
                )))
            }
        }
    }
    Ok(row_count)
}

/// write collection file
pub fn write_file(df: &DataFrame, path: &PathBuf) -> Result<(), LblError> {
    if let Some(parent) = path.parent() {
        std::fs::create_dir_all(parent)
            .map_err(|e| LblError::LblError(format!("could not create directory: {:?}", e)))?;
    }

    let extension = path
        .extension()
        .and_then(std::ffi::OsStr::to_str)
        .unwrap_or("");
    let file = std::fs::File::create(path)
        .map_err(|e| LblError::LblError(format!("could not write file: {:?}", e)))?;
    let mut df = df.clone();

    match extension {
        "csv" => {
            let mut df = address_to_hex(df)?;
            CsvWriter::new(file).finish(&mut df)?
        }
        "parquet" => {
            let n_row_groups = match df.height() {
                0 => 1,
                height if height < 100 => 1,
                height if height < 6400 => height / 100,
                _ => 64,
            };
            ParquetWriter::new(file)
                .with_statistics(true)
                .with_compression(ParquetCompression::Zstd(None))
                .with_row_group_size(Some(n_row_groups))
                .finish(&mut df)?;
        }
        _ => {
            return Err(LblError::LblError(
                "must select either parquet or csv".to_string(),
            ))
        }
    };
    Ok(())
}