lbl/filesystem/
query.rs

1use crate::types::*;
2use polars::prelude::*;
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5
6/// get data root
7pub fn get_data_root() -> std::path::PathBuf {
8    match std::env::var("LBL_DATA_DIR") {
9        Ok(value) => std::path::PathBuf::from(value),
10        Err(_) => {
11            let home_dir = match std::env::var("HOME").or_else(|_| std::env::var("USERPROFILE")) {
12                Ok(path) => path,
13                Err(_) => ".".to_string(),
14            };
15            let mut path = std::path::PathBuf::from(home_dir);
16            path.push(".lbl");
17            path
18        }
19    }
20}
21
22/// list networks in data directory
23pub fn list_networks(query: &Query) -> Result<Vec<String>, LblError> {
24    let paths = query_collection_paths_by_network(query)?;
25    Ok(paths.into_keys().collect())
26}
27
28/// list collections in data directory
29pub fn list_collections(query: &Query) -> Result<Vec<String>, LblError> {
30    let paths = query_collection_paths_by_collection(query)?;
31    Ok(paths.into_keys().collect())
32}
33
34/// get collection paths
35pub fn query_collection_paths_flat(query: &Query) -> Result<Vec<PathBuf>, LblError> {
36    let mut paths = Vec::new();
37    let tree = query_collection_paths_tree(query)?;
38    for network_collections in tree.into_values() {
39        for collection_paths in network_collections.into_values() {
40            paths.extend(collection_paths);
41        }
42    }
43    Ok(paths)
44}
45
46/// get collection paths by network
47pub fn query_collection_paths_by_network(
48    query: &Query,
49) -> Result<HashMap<String, Vec<PathBuf>>, LblError> {
50    let mut paths = HashMap::new();
51    let tree = query_collection_paths_tree(query)?;
52    for (network, network_collections) in tree.into_iter() {
53        if !paths.contains_key(&network) {
54            paths.insert(network.clone(), Vec::new());
55        }
56        if let Some(cv) = paths.get_mut(&network) {
57            for collection_paths in network_collections.into_values() {
58                cv.extend(collection_paths);
59            }
60        }
61    }
62    Ok(paths)
63}
64
65/// get collection paths by collection
66pub fn query_collection_paths_by_collection(
67    query: &Query,
68) -> Result<HashMap<String, Vec<PathBuf>>, LblError> {
69    let mut paths = HashMap::new();
70    let tree = query_collection_paths_tree(query)?;
71    for network_collections in tree.into_values() {
72        for (collection, collection_paths) in network_collections.into_iter() {
73            if !paths.contains_key(&collection) {
74                paths.insert(collection.clone(), Vec::new());
75            }
76            if let Some(cv) = paths.get_mut(&collection) {
77                cv.extend(collection_paths);
78            }
79        }
80    }
81    Ok(paths)
82}
83
84/// returns a mapping of {network => {collection => [path]}}
85pub fn query_collection_paths_tree(
86    query: &Query,
87) -> Result<HashMap<String, HashMap<String, Vec<std::path::PathBuf>>>, LblError> {
88    let mut paths = HashMap::new();
89
90    // get data root
91    let data_root = get_data_root();
92    if !data_root.is_dir() {
93        return Ok(paths);
94    }
95
96    // get networks
97    let networks = match query.networks.clone() {
98        Some(networks) => networks,
99        None => get_child_dirnames(&data_root)?,
100    };
101
102    // crawl data dir
103    for network in networks.into_iter() {
104        let mut network_path = data_root.clone();
105        network_path.push(network.clone());
106        if network_path.is_dir() && (std::fs::read_dir(&network_path)?.count() > 0) {
107            paths.insert(network, get_network_collection_paths(network_path, query)?);
108        }
109    }
110
111    Ok(paths)
112}
113
114/// get children directories of directory
115fn get_child_dirnames(dir: &PathBuf) -> Result<Vec<String>, LblError> {
116    let mut directories = Vec::new();
117    for entry in std::fs::read_dir(dir)? {
118        let path = entry?.path();
119        if path.is_dir() {
120            directories.push(get_leaf_dirname(&path)?);
121        }
122    }
123
124    Ok(directories)
125}
126
127/// get leaf directory name of path
128fn get_leaf_dirname(dir_path: &Path) -> Result<String, LblError> {
129    let name = dir_path
130        .canonicalize()?
131        .file_name()
132        .ok_or(LblError::LblError("could not convert path".to_string()))?
133        .to_string_lossy()
134        .into();
135    Ok(name)
136}
137
138/// get collection paths of network path
139fn get_network_collection_paths(
140    network_path: PathBuf,
141    _query: &Query,
142) -> Result<HashMap<String, Vec<std::path::PathBuf>>, LblError> {
143    let mut paths = HashMap::new();
144    for child_path in std::fs::read_dir(network_path)? {
145        let child_path = child_path?.path();
146        if child_path.is_dir() && (std::fs::read_dir(&child_path)?.count() > 0) {
147            let child_name = get_leaf_dirname(&child_path)?;
148            let mut collection_paths = Vec::new();
149            for file in std::fs::read_dir(child_path)? {
150                let file = file?.path();
151                if is_collection_file(&file) {
152                    collection_paths.push(file);
153                }
154            }
155            paths.insert(child_name, collection_paths);
156        }
157    }
158    Ok(paths)
159}
160
161/// determine whether path is a collection file
162fn is_collection_file(path: &Path) -> bool {
163    match path.extension() {
164        Some(ext) => {
165            let ext_str = ext.to_str().unwrap_or("").to_lowercase();
166            ext_str == "parquet" || ext_str == "csv"
167        }
168        None => false,
169    }
170}
171
172/// query labels
173pub fn query(query: &Query) -> Result<DataFrame, LblError> {
174    let paths = query_collection_paths_flat(query)?;
175
176    let mut csv_paths = Vec::new();
177    let mut parquet_paths = Vec::new();
178
179    for path in paths {
180        match path.extension().and_then(|s| s.to_str()) {
181            Some("csv") => csv_paths.push(path),
182            Some("parquet") => parquet_paths.push(path),
183            _ => (),
184        }
185    }
186
187    let parquet_data = if !parquet_paths.is_empty() {
188        let arc_vec: Arc<Vec<PathBuf>> = Arc::new(parquet_paths);
189        let arc_slice: Arc<[PathBuf]> = Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
190        let opts = ScanArgsParquet::default();
191        let lf = LazyFrame::scan_parquet_files(arc_slice, opts)?;
192        let lf = add_query_filters(lf, query, false)?;
193        let parquet_data = lf.collect()?;
194        let parquet_data = crate::standardize_collection(parquet_data, &CollectionData::default())?;
195        Some(parquet_data)
196    } else {
197        None
198    };
199
200    let csv_data = if !csv_paths.is_empty() {
201        let arc_vec: Arc<Vec<PathBuf>> = Arc::new(csv_paths);
202        let arc_slice: Arc<[PathBuf]> = Arc::from(arc_vec.as_ref().clone().into_boxed_slice());
203        let lf = LazyCsvReader::new_paths(arc_slice).finish()?;
204        let lf = add_query_filters(lf, query, true)?;
205        let csv_data = lf.collect()?;
206        let csv_data = crate::standardize_collection(csv_data, &CollectionData::default())?;
207        Some(csv_data)
208    } else {
209        None
210    };
211
212    match (parquet_data, csv_data) {
213        (Some(parquet_data), Some(csv_data)) => Ok(parquet_data.vstack(&csv_data)?),
214        (Some(parquet_data), None) => Ok(parquet_data),
215        (None, Some(csv_data)) => Ok(csv_data),
216        (None, None) => Err(LblError::LblError("no valid paths".to_string())),
217    }
218}
219
220fn add_query_filters(lf: LazyFrame, query: &Query, csv: bool) -> Result<LazyFrame, LblError> {
221    // filter addresses
222    let lf = if let Some(addresses) = query.addresses.clone() {
223        if csv {
224            let addresses: Vec<Expr> = addresses
225                .into_iter()
226                .map(|address| {
227                    Expr::Literal(LiteralValue::String(format!("0x{}", hex::encode(address))))
228                })
229                .collect();
230            // lf.filter(col("address").str().to_lowercase().is_in(addresses))
231            if addresses.len() == 1 {
232                let address = addresses
233                    .first()
234                    .ok_or(LblError::LblError("could not get address".to_string()))?;
235                lf.filter(col("address").str().to_lowercase().eq(address.clone()))
236            } else {
237                return Err(LblError::LblError(
238                    "not implemented multiple addresses".to_string(),
239                ));
240            }
241        } else {
242            let addresses: Vec<Expr> = addresses
243                .into_iter()
244                .map(|address| Expr::Literal(LiteralValue::Binary(address)))
245                .collect();
246            // lf.filter(col("address").is_in(addresses))
247            if addresses.len() == 1 {
248                let address = addresses
249                    .first()
250                    .ok_or(LblError::LblError("could not get address".to_string()))?;
251                lf.filter(col("address").str().to_lowercase().eq(address.clone()))
252            } else {
253                return Err(LblError::LblError(
254                    "not implemented multiple addresses".to_string(),
255                ));
256            }
257        }
258    } else {
259        lf
260    };
261
262    Ok(lf)
263}