use crate::types::*;
use polars::prelude::*;
use std::collections::HashMap;
use std::path::PathBuf;
pub fn address_to_hex(data: DataFrame) -> Result<DataFrame, LblError> {
let address_column = data.column("address")?;
if let DataType::Binary = address_column.dtype() {
let address = data.column("address")?.binary()?.hex_encode();
let address = address.str()?;
let prefix: Series = vec!["0x".to_string(); address.len()].into_iter().collect();
let prefix = prefix.with_name("address");
let prefix = prefix.str()?;
let data = data.clone().with_column(prefix.concat(address))?.clone();
Ok(data)
} else {
Ok(data)
}
}
pub fn address_to_binary(data: DataFrame) -> Result<DataFrame, LblError> {
let mut data = data.clone();
let address_column = data.column("address")?;
if let DataType::String = address_column.dtype() {
let height = address_column.len();
let offset: Series = vec![2; height].iter().collect();
let length: Series = vec![42; height].iter().collect();
let trimmed_prefix = address_column.str()?.str_slice(&offset, &length)?;
data = data.with_column(trimmed_prefix.hex_decode(true)?)?.clone();
Ok(data)
} else {
Ok(data)
}
}
pub(crate) fn get_standard_columns() -> HashMap<String, DataType> {
[
("address", DataType::Binary),
("collection", DataType::String),
("name", DataType::String),
("project", DataType::String),
("class", DataType::String),
("network", DataType::String),
("extra_data", DataType::String),
("added_by", DataType::String),
("date_added", DataType::String),
]
.into_iter()
.map(|(s, dt)| (s.to_string(), dt))
.collect()
}
pub(crate) fn get_standard_column_order() -> Vec<String> {
vec![
"address".to_string(),
"collection".to_string(),
"name".to_string(),
"project".to_string(),
"class".to_string(),
"network".to_string(),
"extra_data".to_string(),
"added_by".to_string(),
"date_added".to_string(),
]
}
pub fn standardize_collection(
df: DataFrame,
metadata: &CollectionData,
) -> Result<DataFrame, LblError> {
let mut df = df.clone();
if let (Err(_), Some(collection)) = (df.column("collection"), metadata.collection.clone()) {
let column: Series = vec![collection.clone(); df.height()].into_iter().collect();
df = df.with_column(column.with_name("collection"))?.clone();
};
let df_columns = df.get_column_names();
let mut df = df.clone();
let standard_columns = get_standard_columns();
for (column, dtype) in standard_columns.iter() {
if !df_columns.contains(&column.as_str()) {
let series = create_null_column(column.clone(), dtype.clone(), df.height());
df = df.with_column(series)?.clone();
}
}
for column in df_columns {
if !standard_columns.contains_key(column) {
df = df.drop(column)?;
}
}
let address_column = df.column("address")?;
if let DataType::String = address_column.dtype() {
let height = address_column.len();
let offset: Series = vec![2; height].iter().collect();
let length: Series = vec![42; height].iter().collect();
let trimmed_prefix = address_column.str()?.str_slice(&offset, &length)?;
df = df.with_column(trimmed_prefix.hex_decode(true)?)?.clone();
};
let columns = get_standard_column_order();
let df = df.select(columns)?;
Ok(df)
}
fn create_null_column(name: String, dtype: DataType, len: usize) -> Series {
match dtype {
DataType::String => Series::new(
name.as_str(),
std::iter::repeat(None::<&str>)
.take(len)
.collect::<Vec<Option<&str>>>()
.as_slice(),
),
DataType::Binary => Series::new(
name.as_str(),
std::iter::repeat(None::<&[u8]>)
.take(len)
.collect::<Vec<Option<&[u8]>>>()
.as_slice(),
),
_ => unimplemented!("Unsupported data type"),
}
}
pub fn get_row_counts(paths: Vec<PathBuf>) -> Result<i64, LblError> {
let paths_by_extension = crate::filesystem::paths_by_extension(paths);
let mut row_count = 0;
for (extension, extension_paths) in paths_by_extension.into_iter() {
match extension.as_str() {
"csv" => {
let arc_vec: Arc<Vec<PathBuf>> = Arc::new(extension_paths);
let arc_slice: Arc<[PathBuf]> =
Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
let lf = LazyCsvReader::new_paths(arc_slice)
.finish()?
.select([col("address")])
.count()
.select([col("address").cast(DataType::Int64)])
.collect()?;
row_count += lf.column("address")?.i64()?.get(0).unwrap_or(0);
}
"parquet" => {
let arc_vec: Arc<Vec<PathBuf>> = Arc::new(extension_paths);
let arc_slice: Arc<[PathBuf]> =
Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
let opts = ScanArgsParquet::default();
let lf = LazyFrame::scan_parquet_files(arc_slice, opts)?
.select([col("address")])
.count()
.select([col("address").cast(DataType::Int64)])
.collect()?;
row_count += lf.column("address")?.i64()?.get(0).unwrap_or(0);
}
other => {
return Err(LblError::LblError(format!(
"Unknown file extension: {}",
other
)))
}
}
}
Ok(row_count)
}
pub fn write_file(df: &DataFrame, path: &PathBuf) -> Result<(), LblError> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| LblError::LblError(format!("could not create directory: {:?}", e)))?;
}
let extension = path
.extension()
.and_then(std::ffi::OsStr::to_str)
.unwrap_or("");
let file = std::fs::File::create(path)
.map_err(|e| LblError::LblError(format!("could not write file: {:?}", e)))?;
let mut df = df.clone();
match extension {
"csv" => {
let mut df = address_to_hex(df)?;
CsvWriter::new(file).finish(&mut df)?
}
"parquet" => {
let n_row_groups = match df.height() {
0 => 1,
height if height < 100 => 1,
height if height < 6400 => height / 100,
_ => 64,
};
ParquetWriter::new(file)
.with_statistics(true)
.with_compression(ParquetCompression::Zstd(None))
.with_row_group_size(Some(n_row_groups))
.finish(&mut df)?;
}
_ => {
return Err(LblError::LblError(
"must select either parquet or csv".to_string(),
))
}
};
Ok(())
}