1use crate::types::*;
2use polars::prelude::*;
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5
6pub fn get_data_root() -> std::path::PathBuf {
8 match std::env::var("LBL_DATA_DIR") {
9 Ok(value) => std::path::PathBuf::from(value),
10 Err(_) => {
11 let home_dir = match std::env::var("HOME").or_else(|_| std::env::var("USERPROFILE")) {
12 Ok(path) => path,
13 Err(_) => ".".to_string(),
14 };
15 let mut path = std::path::PathBuf::from(home_dir);
16 path.push(".lbl");
17 path
18 }
19 }
20}
21
22pub fn list_networks(query: &Query) -> Result<Vec<String>, LblError> {
24 let paths = query_collection_paths_by_network(query)?;
25 Ok(paths.into_keys().collect())
26}
27
28pub fn list_collections(query: &Query) -> Result<Vec<String>, LblError> {
30 let paths = query_collection_paths_by_collection(query)?;
31 Ok(paths.into_keys().collect())
32}
33
34pub fn query_collection_paths_flat(query: &Query) -> Result<Vec<PathBuf>, LblError> {
36 let mut paths = Vec::new();
37 let tree = query_collection_paths_tree(query)?;
38 for network_collections in tree.into_values() {
39 for collection_paths in network_collections.into_values() {
40 paths.extend(collection_paths);
41 }
42 }
43 Ok(paths)
44}
45
46pub fn query_collection_paths_by_network(
48 query: &Query,
49) -> Result<HashMap<String, Vec<PathBuf>>, LblError> {
50 let mut paths = HashMap::new();
51 let tree = query_collection_paths_tree(query)?;
52 for (network, network_collections) in tree.into_iter() {
53 if !paths.contains_key(&network) {
54 paths.insert(network.clone(), Vec::new());
55 }
56 if let Some(cv) = paths.get_mut(&network) {
57 for collection_paths in network_collections.into_values() {
58 cv.extend(collection_paths);
59 }
60 }
61 }
62 Ok(paths)
63}
64
65pub fn query_collection_paths_by_collection(
67 query: &Query,
68) -> Result<HashMap<String, Vec<PathBuf>>, LblError> {
69 let mut paths = HashMap::new();
70 let tree = query_collection_paths_tree(query)?;
71 for network_collections in tree.into_values() {
72 for (collection, collection_paths) in network_collections.into_iter() {
73 if !paths.contains_key(&collection) {
74 paths.insert(collection.clone(), Vec::new());
75 }
76 if let Some(cv) = paths.get_mut(&collection) {
77 cv.extend(collection_paths);
78 }
79 }
80 }
81 Ok(paths)
82}
83
84pub fn query_collection_paths_tree(
86 query: &Query,
87) -> Result<HashMap<String, HashMap<String, Vec<std::path::PathBuf>>>, LblError> {
88 let mut paths = HashMap::new();
89
90 let data_root = get_data_root();
92 if !data_root.is_dir() {
93 return Ok(paths);
94 }
95
96 let networks = match query.networks.clone() {
98 Some(networks) => networks,
99 None => get_child_dirnames(&data_root)?,
100 };
101
102 for network in networks.into_iter() {
104 let mut network_path = data_root.clone();
105 network_path.push(network.clone());
106 if network_path.is_dir() && (std::fs::read_dir(&network_path)?.count() > 0) {
107 paths.insert(network, get_network_collection_paths(network_path, query)?);
108 }
109 }
110
111 Ok(paths)
112}
113
114fn get_child_dirnames(dir: &PathBuf) -> Result<Vec<String>, LblError> {
116 let mut directories = Vec::new();
117 for entry in std::fs::read_dir(dir)? {
118 let path = entry?.path();
119 if path.is_dir() {
120 directories.push(get_leaf_dirname(&path)?);
121 }
122 }
123
124 Ok(directories)
125}
126
127fn get_leaf_dirname(dir_path: &Path) -> Result<String, LblError> {
129 let name = dir_path
130 .canonicalize()?
131 .file_name()
132 .ok_or(LblError::LblError("could not convert path".to_string()))?
133 .to_string_lossy()
134 .into();
135 Ok(name)
136}
137
138fn get_network_collection_paths(
140 network_path: PathBuf,
141 _query: &Query,
142) -> Result<HashMap<String, Vec<std::path::PathBuf>>, LblError> {
143 let mut paths = HashMap::new();
144 for child_path in std::fs::read_dir(network_path)? {
145 let child_path = child_path?.path();
146 if child_path.is_dir() && (std::fs::read_dir(&child_path)?.count() > 0) {
147 let child_name = get_leaf_dirname(&child_path)?;
148 let mut collection_paths = Vec::new();
149 for file in std::fs::read_dir(child_path)? {
150 let file = file?.path();
151 if is_collection_file(&file) {
152 collection_paths.push(file);
153 }
154 }
155 paths.insert(child_name, collection_paths);
156 }
157 }
158 Ok(paths)
159}
160
161fn is_collection_file(path: &Path) -> bool {
163 match path.extension() {
164 Some(ext) => {
165 let ext_str = ext.to_str().unwrap_or("").to_lowercase();
166 ext_str == "parquet" || ext_str == "csv"
167 }
168 None => false,
169 }
170}
171
172pub fn query(query: &Query) -> Result<DataFrame, LblError> {
174 let paths = query_collection_paths_flat(query)?;
175
176 let mut csv_paths = Vec::new();
177 let mut parquet_paths = Vec::new();
178
179 for path in paths {
180 match path.extension().and_then(|s| s.to_str()) {
181 Some("csv") => csv_paths.push(path),
182 Some("parquet") => parquet_paths.push(path),
183 _ => (),
184 }
185 }
186
187 let parquet_data = if !parquet_paths.is_empty() {
188 let arc_vec: Arc<Vec<PathBuf>> = Arc::new(parquet_paths);
189 let arc_slice: Arc<[PathBuf]> = Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
190 let opts = ScanArgsParquet::default();
191 let lf = LazyFrame::scan_parquet_files(arc_slice, opts)?;
192 let lf = add_query_filters(lf, query, false)?;
193 let parquet_data = lf.collect()?;
194 let parquet_data = crate::standardize_collection(parquet_data, &CollectionData::default())?;
195 Some(parquet_data)
196 } else {
197 None
198 };
199
200 let csv_data = if !csv_paths.is_empty() {
201 let arc_vec: Arc<Vec<PathBuf>> = Arc::new(csv_paths);
202 let arc_slice: Arc<[PathBuf]> = Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
203 let lf = LazyCsvReader::new_paths(arc_slice).finish()?;
204 let lf = add_query_filters(lf, query, true)?;
205 let csv_data = lf.collect()?;
206 let csv_data = crate::standardize_collection(csv_data, &CollectionData::default())?;
207 Some(csv_data)
208 } else {
209 None
210 };
211
212 match (parquet_data, csv_data) {
213 (Some(parquet_data), Some(csv_data)) => Ok(parquet_data.vstack(&csv_data)?),
214 (Some(parquet_data), None) => Ok(parquet_data),
215 (None, Some(csv_data)) => Ok(csv_data),
216 (None, None) => Err(LblError::LblError("no valid paths".to_string())),
217 }
218}
219
220fn add_query_filters(lf: LazyFrame, query: &Query, csv: bool) -> Result<LazyFrame, LblError> {
221 let lf = if let Some(addresses) = query.addresses.clone() {
223 if csv {
224 let addresses: Vec<Expr> = addresses
225 .into_iter()
226 .map(|address| {
227 Expr::Literal(LiteralValue::String(format!("0x{}", hex::encode(address))))
228 })
229 .collect();
230 if addresses.len() == 1 {
232 let address = addresses
233 .first()
234 .ok_or(LblError::LblError("could not get address".to_string()))?;
235 lf.filter(col("address").str().to_lowercase().eq(address.clone()))
236 } else {
237 return Err(LblError::LblError(
238 "not implemented multiple addresses".to_string(),
239 ));
240 }
241 } else {
242 let addresses: Vec<Expr> = addresses
243 .into_iter()
244 .map(|address| Expr::Literal(LiteralValue::Binary(address)))
245 .collect();
246 if addresses.len() == 1 {
248 let address = addresses
249 .first()
250 .ok_or(LblError::LblError("could not get address".to_string()))?;
251 lf.filter(col("address").str().to_lowercase().eq(address.clone()))
252 } else {
253 return Err(LblError::LblError(
254 "not implemented multiple addresses".to_string(),
255 ));
256 }
257 }
258 } else {
259 lf
260 };
261
262 Ok(lf)
263}