lbl-core 0.1.0

lbl is a toolkit for managing address labels
Documentation
use crate::types::*;
use polars::prelude::*;
use std::collections::HashMap;
use std::path::{Path, PathBuf};

/// get data root
pub fn get_data_root() -> std::path::PathBuf {
    match std::env::var("LBL_DATA_DIR") {
        Ok(value) => std::path::PathBuf::from(value),
        Err(_) => {
            let home_dir = match std::env::var("HOME").or_else(|_| std::env::var("USERPROFILE")) {
                Ok(path) => path,
                Err(_) => ".".to_string(),
            };
            let mut path = std::path::PathBuf::from(home_dir);
            path.push(".lbl");
            path
        }
    }
}

/// list networks in data directory
pub fn list_networks(query: &Query) -> Result<Vec<String>, LblError> {
    let paths = query_collection_paths_by_network(query)?;
    Ok(paths.into_keys().collect())
}

/// list collections in data directory
pub fn list_collections(query: &Query) -> Result<Vec<String>, LblError> {
    let paths = query_collection_paths_by_collection(query)?;
    Ok(paths.into_keys().collect())
}

/// get collection paths
pub fn query_collection_paths_flat(query: &Query) -> Result<Vec<PathBuf>, LblError> {
    let mut paths = Vec::new();
    let tree = query_collection_paths_tree(query)?;
    for network_collections in tree.into_values() {
        for collection_paths in network_collections.into_values() {
            paths.extend(collection_paths);
        }
    }
    Ok(paths)
}

/// get collection paths by network
pub fn query_collection_paths_by_network(
    query: &Query,
) -> Result<HashMap<String, Vec<PathBuf>>, LblError> {
    let mut paths = HashMap::new();
    let tree = query_collection_paths_tree(query)?;
    for (network, network_collections) in tree.into_iter() {
        if !paths.contains_key(&network) {
            paths.insert(network.clone(), Vec::new());
        }
        if let Some(cv) = paths.get_mut(&network) {
            for collection_paths in network_collections.into_values() {
                cv.extend(collection_paths);
            }
        }
    }
    Ok(paths)
}

/// get collection paths by collection
pub fn query_collection_paths_by_collection(
    query: &Query,
) -> Result<HashMap<String, Vec<PathBuf>>, LblError> {
    let mut paths = HashMap::new();
    let tree = query_collection_paths_tree(query)?;
    for network_collections in tree.into_values() {
        for (collection, collection_paths) in network_collections.into_iter() {
            if !paths.contains_key(&collection) {
                paths.insert(collection.clone(), Vec::new());
            }
            if let Some(cv) = paths.get_mut(&collection) {
                cv.extend(collection_paths);
            }
        }
    }
    Ok(paths)
}

/// returns a mapping of {network => {collection => [path]}}
pub fn query_collection_paths_tree(
    query: &Query,
) -> Result<HashMap<String, HashMap<String, Vec<std::path::PathBuf>>>, LblError> {
    let mut paths = HashMap::new();

    // get data root
    let data_root = get_data_root();
    if !data_root.is_dir() {
        return Ok(paths);
    }

    // get networks
    let networks = match query.networks.clone() {
        Some(networks) => networks,
        None => get_child_dirnames(&data_root)?,
    };

    // crawl data dir
    for network in networks.into_iter() {
        let mut network_path = data_root.clone();
        network_path.push(network.clone());
        if network_path.is_dir() && (std::fs::read_dir(&network_path)?.count() > 0) {
            paths.insert(network, get_network_collection_paths(network_path, query)?);
        }
    }

    Ok(paths)
}

/// get children directories of directory
fn get_child_dirnames(dir: &PathBuf) -> Result<Vec<String>, LblError> {
    let mut directories = Vec::new();
    for entry in std::fs::read_dir(dir)? {
        let path = entry?.path();
        if path.is_dir() {
            directories.push(get_leaf_dirname(&path)?);
        }
    }

    Ok(directories)
}

/// get leaf directory name of path
fn get_leaf_dirname(dir_path: &Path) -> Result<String, LblError> {
    let name = dir_path
        .canonicalize()?
        .file_name()
        .ok_or(LblError::LblError("could not convert path".to_string()))?
        .to_string_lossy()
        .into();
    Ok(name)
}

/// get collection paths of network path
fn get_network_collection_paths(
    network_path: PathBuf,
    _query: &Query,
) -> Result<HashMap<String, Vec<std::path::PathBuf>>, LblError> {
    let mut paths = HashMap::new();
    for child_path in std::fs::read_dir(network_path)? {
        let child_path = child_path?.path();
        if child_path.is_dir() && (std::fs::read_dir(&child_path)?.count() > 0) {
            let child_name = get_leaf_dirname(&child_path)?;
            let mut collection_paths = Vec::new();
            for file in std::fs::read_dir(child_path)? {
                let file = file?.path();
                if is_collection_file(&file) {
                    collection_paths.push(file);
                }
            }
            paths.insert(child_name, collection_paths);
        }
    }
    Ok(paths)
}

/// determine whether path is a collection file
fn is_collection_file(path: &Path) -> bool {
    match path.extension() {
        Some(ext) => {
            let ext_str = ext.to_str().unwrap_or("").to_lowercase();
            ext_str == "parquet" || ext_str == "csv"
        }
        None => false,
    }
}

/// query labels
pub fn query(query: &Query) -> Result<DataFrame, LblError> {
    let paths = query_collection_paths_flat(query)?;

    let mut csv_paths = Vec::new();
    let mut parquet_paths = Vec::new();

    for path in paths {
        match path.extension().and_then(|s| s.to_str()) {
            Some("csv") => csv_paths.push(path),
            Some("parquet") => parquet_paths.push(path),
            _ => (),
        }
    }

    let parquet_data = if !parquet_paths.is_empty() {
        let arc_vec: Arc<Vec<PathBuf>> = Arc::new(parquet_paths);
        let arc_slice: Arc<[PathBuf]> = Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
        let opts = ScanArgsParquet::default();
        let lf = LazyFrame::scan_parquet_files(arc_slice, opts)?;
        let lf = add_query_filters(lf, query, false)?;
        let parquet_data = lf.collect()?;
        let parquet_data = crate::standardize_collection(parquet_data, &CollectionData::default())?;
        Some(parquet_data)
    } else {
        None
    };

    let csv_data = if !csv_paths.is_empty() {
        let arc_vec: Arc<Vec<PathBuf>> = Arc::new(csv_paths);
        let arc_slice: Arc<[PathBuf]> = Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
        let lf = LazyCsvReader::new_paths(arc_slice).finish()?;
        let lf = add_query_filters(lf, query, true)?;
        let csv_data = lf.collect()?;
        let csv_data = crate::standardize_collection(csv_data, &CollectionData::default())?;
        Some(csv_data)
    } else {
        None
    };

    match (parquet_data, csv_data) {
        (Some(parquet_data), Some(csv_data)) => Ok(parquet_data.vstack(&csv_data)?),
        (Some(parquet_data), None) => Ok(parquet_data),
        (None, Some(csv_data)) => Ok(csv_data),
        (None, None) => Err(LblError::LblError("no valid paths".to_string())),
    }
}

fn add_query_filters(lf: LazyFrame, query: &Query, csv: bool) -> Result<LazyFrame, LblError> {
    // filter addresses
    let lf = if let Some(addresses) = query.addresses.clone() {
        if csv {
            let addresses: Vec<Expr> = addresses
                .into_iter()
                .map(|address| {
                    Expr::Literal(LiteralValue::String(format!("0x{}", hex::encode(address))))
                })
                .collect();
            // lf.filter(col("address").str().to_lowercase().is_in(addresses))
            if addresses.len() == 1 {
                let address = addresses
                    .first()
                    .ok_or(LblError::LblError("could not get address".to_string()))?;
                lf.filter(col("address").str().to_lowercase().eq(address.clone()))
            } else {
                return Err(LblError::LblError(
                    "not implemented multiple addresses".to_string(),
                ));
            }
        } else {
            let addresses: Vec<Expr> = addresses
                .into_iter()
                .map(|address| Expr::Literal(LiteralValue::Binary(address)))
                .collect();
            // lf.filter(col("address").is_in(addresses))
            if addresses.len() == 1 {
                let address = addresses
                    .first()
                    .ok_or(LblError::LblError("could not get address".to_string()))?;
                lf.filter(col("address").str().to_lowercase().eq(address.clone()))
            } else {
                return Err(LblError::LblError(
                    "not implemented multiple addresses".to_string(),
                ));
            }
        }
    } else {
        lf
    };

    Ok(lf)
}