use std::fs::create_dir_all;
use std::fs::File;
use anyhow::Error;
use anyhow::anyhow;
use std::path::PathBuf;
use glob::glob;
use flate2::read::MultiGzDecoder;
use zstd::stream::read::Decoder as ZstdDecoder;
use std::io::{BufReader, Cursor, Write, Read};
use flate2::write::GzEncoder;
use flate2::Compression;
use zstd::stream::write::Encoder as ZstdEncoder;
use indicatif::{ProgressBar, ProgressStyle};
const VALID_EXTS: &[&str] = &[".jsonl", ".jsonl.gz", ".jsonl.zstd", ".jsonl.zst", ".json.gz", ".json.zst"];
pub fn build_pbar(num_items: usize, units: &str) -> ProgressBar {
let mut template = String::from(units);
template.push_str(" {human_pos}/{human_len} [{elapsed_precise}/{duration_precise}] [{wide_bar:.cyan/blue}]");
let pbar = ProgressBar::new(num_items as u64)
.with_style(
ProgressStyle::with_template(&template).unwrap()
);
pbar.inc(0);
pbar
}
pub fn expand_dirs(paths: Vec<PathBuf>, manual_ext: Option<&[&str]>) -> Result<Vec<PathBuf>, Error> {
let exts = if !manual_ext.is_none() {
manual_ext.unwrap()
} else {
VALID_EXTS
};
let mut files: Vec<PathBuf> = Vec::new();
for path in paths {
if path.is_dir() {
let path_str = path
.to_str()
.ok_or_else(|| anyhow!("invalid path '{}'", path.to_string_lossy()))?;
for ext in exts {
let pattern = format!("{}/**/*{}", path_str, ext);
for entry in glob(&pattern).expect("Failed to read glob pattern") {
if let Ok(path) = entry {
files.push(path)
}
}
}
} else {
files.push(path.clone());
}
}
Ok(files)
}
pub fn has_json_extension(path: &PathBuf) -> bool {
if let Some(ext) = path.extension() {
if ext == "json" {
return true;
} else if let Some(ext_os_str) = path.extension() {
if let Some(ext_str) = ext_os_str.to_str() {
return ext_str.starts_with("json.");
}
}
}
false
}
pub fn get_output_filename(input_path: &PathBuf, config_input_dir: &PathBuf, config_output_dir: &PathBuf) -> Result<PathBuf, Error> {
let replaced = input_path.clone()
.strip_prefix(config_input_dir)
.ok()
.map(|stripped| config_output_dir.clone().join(stripped)).unwrap();
Ok(replaced)
}
pub fn read_pathbuf_to_mem(input_file: &PathBuf) -> Result<BufReader<Cursor<Vec<u8>>>, Error> {
let contents = read_local_file_into_memory(input_file).expect("Failed to read contents into memory");
let reader = BufReader::new(contents);
Ok(reader)
}
fn read_local_file_into_memory(input_file: &PathBuf) -> Result<Cursor<Vec<u8>>, Error> {
let mut file = File::open(input_file)?;
let mut contents = Vec::new();
let ext = input_file.extension().unwrap().to_string_lossy().to_lowercase();
if ext == "gz" {
let mut decoder = MultiGzDecoder::new(file);
decoder.read_to_end(&mut contents)?;
} else if ext == "zstd" || ext == "zst" {
let mut decoder = ZstdDecoder::new(file)?;
match decoder.read_to_end(&mut contents) { Ok(_) => {}, Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { let file = File::open(input_file)?;
let mut decoder = ZstdDecoder::new(file)?;
contents.clear();
let mut buffer = [0; 64 * 1024];
loop {
match decoder.read(&mut buffer) {
Ok(0) => break,
Ok(n) => contents.extend_from_slice(&buffer[..n]),
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
}
}
Err(e) => return Err(e.into()),
}
} else {
file.read_to_end(&mut contents)?;
}
Ok(Cursor::new(contents))
}
pub fn write_mem_to_pathbuf(contents: &[u8], output_file: &PathBuf) -> Result<(), Error> {
let compressed_data = compress_data(contents.to_vec(), output_file);
if let Some(parent_dir) = output_file.parent() {
if !parent_dir.exists() {
create_dir_all(parent_dir).unwrap()
}
let mut file = File::create(output_file).expect(format!("Unable to create output file {:?}", output_file).as_str());
file.write_all(&compressed_data).expect(format!("Unable to write to {:?}", output_file).as_str());
}
Ok(())
}
fn compress_data(data: Vec<u8>, filename: &PathBuf) -> Vec<u8> {
let output_data = match filename.extension().unwrap().to_str() {
Some("gz") => {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&data).unwrap();
encoder.finish().unwrap()
},
Some("zstd") | Some("zst") => {
let mut encoder = ZstdEncoder::new(Vec::new(), 0).unwrap();
encoder.write_all(&data).unwrap();
encoder.finish().unwrap()
},
_ => {data}
};
output_data
}