use crate::types::*;
use polars::prelude::*;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
pub fn get_data_root() -> std::path::PathBuf {
match std::env::var("LBL_DATA_DIR") {
Ok(value) => std::path::PathBuf::from(value),
Err(_) => {
let home_dir = match std::env::var("HOME").or_else(|_| std::env::var("USERPROFILE")) {
Ok(path) => path,
Err(_) => ".".to_string(),
};
let mut path = std::path::PathBuf::from(home_dir);
path.push(".lbl");
path
}
}
}
pub fn list_networks(query: &Query) -> Result<Vec<String>, LblError> {
let paths = query_collection_paths_by_network(query)?;
Ok(paths.into_keys().collect())
}
pub fn list_collections(query: &Query) -> Result<Vec<String>, LblError> {
let paths = query_collection_paths_by_collection(query)?;
Ok(paths.into_keys().collect())
}
pub fn query_collection_paths_flat(query: &Query) -> Result<Vec<PathBuf>, LblError> {
let mut paths = Vec::new();
let tree = query_collection_paths_tree(query)?;
for network_collections in tree.into_values() {
for collection_paths in network_collections.into_values() {
paths.extend(collection_paths);
}
}
Ok(paths)
}
pub fn query_collection_paths_by_network(
query: &Query,
) -> Result<HashMap<String, Vec<PathBuf>>, LblError> {
let mut paths = HashMap::new();
let tree = query_collection_paths_tree(query)?;
for (network, network_collections) in tree.into_iter() {
if !paths.contains_key(&network) {
paths.insert(network.clone(), Vec::new());
}
if let Some(cv) = paths.get_mut(&network) {
for collection_paths in network_collections.into_values() {
cv.extend(collection_paths);
}
}
}
Ok(paths)
}
pub fn query_collection_paths_by_collection(
query: &Query,
) -> Result<HashMap<String, Vec<PathBuf>>, LblError> {
let mut paths = HashMap::new();
let tree = query_collection_paths_tree(query)?;
for network_collections in tree.into_values() {
for (collection, collection_paths) in network_collections.into_iter() {
if !paths.contains_key(&collection) {
paths.insert(collection.clone(), Vec::new());
}
if let Some(cv) = paths.get_mut(&collection) {
cv.extend(collection_paths);
}
}
}
Ok(paths)
}
pub fn query_collection_paths_tree(
query: &Query,
) -> Result<HashMap<String, HashMap<String, Vec<std::path::PathBuf>>>, LblError> {
let mut paths = HashMap::new();
let data_root = get_data_root();
if !data_root.is_dir() {
return Ok(paths);
}
let networks = match query.networks.clone() {
Some(networks) => networks,
None => get_child_dirnames(&data_root)?,
};
for network in networks.into_iter() {
let mut network_path = data_root.clone();
network_path.push(network.clone());
if network_path.is_dir() && (std::fs::read_dir(&network_path)?.count() > 0) {
paths.insert(network, get_network_collection_paths(network_path, query)?);
}
}
Ok(paths)
}
fn get_child_dirnames(dir: &PathBuf) -> Result<Vec<String>, LblError> {
let mut directories = Vec::new();
for entry in std::fs::read_dir(dir)? {
let path = entry?.path();
if path.is_dir() {
directories.push(get_leaf_dirname(&path)?);
}
}
Ok(directories)
}
fn get_leaf_dirname(dir_path: &Path) -> Result<String, LblError> {
let name = dir_path
.canonicalize()?
.file_name()
.ok_or(LblError::LblError("could not convert path".to_string()))?
.to_string_lossy()
.into();
Ok(name)
}
fn get_network_collection_paths(
network_path: PathBuf,
_query: &Query,
) -> Result<HashMap<String, Vec<std::path::PathBuf>>, LblError> {
let mut paths = HashMap::new();
for child_path in std::fs::read_dir(network_path)? {
let child_path = child_path?.path();
if child_path.is_dir() && (std::fs::read_dir(&child_path)?.count() > 0) {
let child_name = get_leaf_dirname(&child_path)?;
let mut collection_paths = Vec::new();
for file in std::fs::read_dir(child_path)? {
let file = file?.path();
if is_collection_file(&file) {
collection_paths.push(file);
}
}
paths.insert(child_name, collection_paths);
}
}
Ok(paths)
}
fn is_collection_file(path: &Path) -> bool {
match path.extension() {
Some(ext) => {
let ext_str = ext.to_str().unwrap_or("").to_lowercase();
ext_str == "parquet" || ext_str == "csv"
}
None => false,
}
}
pub fn query(query: &Query) -> Result<DataFrame, LblError> {
let paths = query_collection_paths_flat(query)?;
let mut csv_paths = Vec::new();
let mut parquet_paths = Vec::new();
for path in paths {
match path.extension().and_then(|s| s.to_str()) {
Some("csv") => csv_paths.push(path),
Some("parquet") => parquet_paths.push(path),
_ => (),
}
}
let parquet_data = if !parquet_paths.is_empty() {
let arc_vec: Arc<Vec<PathBuf>> = Arc::new(parquet_paths);
let arc_slice: Arc<[PathBuf]> = Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
let opts = ScanArgsParquet::default();
let lf = LazyFrame::scan_parquet_files(arc_slice, opts)?;
let lf = add_query_filters(lf, query, false)?;
let parquet_data = lf.collect()?;
let parquet_data = crate::standardize_collection(parquet_data, &CollectionData::default())?;
Some(parquet_data)
} else {
None
};
let csv_data = if !csv_paths.is_empty() {
let arc_vec: Arc<Vec<PathBuf>> = Arc::new(csv_paths);
let arc_slice: Arc<[PathBuf]> = Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
let lf = LazyCsvReader::new_paths(arc_slice).finish()?;
let lf = add_query_filters(lf, query, true)?;
let csv_data = lf.collect()?;
let csv_data = crate::standardize_collection(csv_data, &CollectionData::default())?;
Some(csv_data)
} else {
None
};
match (parquet_data, csv_data) {
(Some(parquet_data), Some(csv_data)) => Ok(parquet_data.vstack(&csv_data)?),
(Some(parquet_data), None) => Ok(parquet_data),
(None, Some(csv_data)) => Ok(csv_data),
(None, None) => Err(LblError::LblError("no valid paths".to_string())),
}
}
fn add_query_filters(lf: LazyFrame, query: &Query, csv: bool) -> Result<LazyFrame, LblError> {
let lf = if let Some(addresses) = query.addresses.clone() {
if csv {
let addresses: Vec<Expr> = addresses
.into_iter()
.map(|address| {
Expr::Literal(LiteralValue::String(format!("0x{}", hex::encode(address))))
})
.collect();
if addresses.len() == 1 {
let address = addresses
.first()
.ok_or(LblError::LblError("could not get address".to_string()))?;
lf.filter(col("address").str().to_lowercase().eq(address.clone()))
} else {
return Err(LblError::LblError(
"not implemented multiple addresses".to_string(),
));
}
} else {
let addresses: Vec<Expr> = addresses
.into_iter()
.map(|address| Expr::Literal(LiteralValue::Binary(address)))
.collect();
if addresses.len() == 1 {
let address = addresses
.first()
.ok_or(LblError::LblError("could not get address".to_string()))?;
lf.filter(col("address").str().to_lowercase().eq(address.clone()))
} else {
return Err(LblError::LblError(
"not implemented multiple addresses".to_string(),
));
}
}
} else {
lf
};
Ok(lf)
}