use std::collections::HashMap;
use std::path::{Path, PathBuf};
use rayon::prelude::*;
use crate::datasets::sec::error::Result;
use crate::datasets::sec::layout::Workdir;
pub enum FileParse<R> {
Parsed(R),
Skipped,
Failed,
}
pub fn par_parse_emit<R, P, E>(
paths: &[PathBuf],
chunk_size: usize,
parse_one: P,
mut emit: E,
) -> Result<(usize, usize)>
where
R: Send,
P: Fn(&Path) -> FileParse<R> + Sync + Send,
E: FnMut(&Path, R) -> Result<()>,
{
let mut emitted = 0usize;
let mut errors = 0usize;
for chunk in paths.chunks(chunk_size.max(1)) {
let parsed: Vec<FileParse<R>> = chunk.par_iter().map(|p| parse_one(p)).collect();
for (path, outcome) in chunk.iter().zip(parsed) {
match outcome {
FileParse::Parsed(r) => {
emit(path, r)?;
emitted += 1;
}
FileParse::Skipped => {}
FileParse::Failed => errors += 1,
}
}
}
Ok((emitted, errors))
}
pub const PARSE_CHUNK: usize = 256;
pub fn walk_filings<F>(root: &Path, is_match: F) -> Result<Vec<PathBuf>>
where
F: Fn(&str) -> bool,
{
let mut out = Vec::new();
let Ok(cik_dirs) = std::fs::read_dir(root) else {
return Ok(out);
};
for cik_entry in cik_dirs.flatten() {
let cik_path = cik_entry.path();
if !cik_path.is_dir() {
continue;
}
let Ok(acc_dirs) = std::fs::read_dir(&cik_path) else {
continue;
};
for acc_entry in acc_dirs.flatten() {
let acc_path = acc_entry.path();
if !acc_path.is_dir() {
continue;
}
let Ok(files) = std::fs::read_dir(&acc_path) else {
continue;
};
for f in files.flatten() {
let p = f.path();
let name = p.file_name().and_then(|n| n.to_str()).unwrap_or("");
if is_match(name) {
out.push(p);
}
}
}
}
Ok(out)
}
pub fn is_ownership_xml(name: &str) -> bool {
name.ends_with(".xml")
}
fn load_form_index(workdir: &Workdir) -> HashMap<String, String> {
let mut idx = HashMap::new();
let Ok(mut rdr) = csv::Reader::from_path(workdir.processed_csv("filing_index")) else {
return idx;
};
let Ok(headers) = rdr.headers().cloned() else {
return idx;
};
let acc_col = headers.iter().position(|c| c == "accession_number");
let form_col = headers.iter().position(|c| c == "form_type");
let (Some(acc_col), Some(form_col)) = (acc_col, form_col) else {
return idx;
};
for rec in rdr.records().flatten() {
if let (Some(acc), Some(form)) = (rec.get(acc_col), rec.get(form_col)) {
if !acc.is_empty() && !form.is_empty() {
idx.insert(acc.to_string(), form.to_string());
}
}
}
idx
}
pub fn walk_filings_of_form(
workdir: &Workdir,
root: &Path,
forms: &[&str],
) -> Result<Vec<PathBuf>> {
let index = load_form_index(workdir);
let html = walk_filings(root, |n| {
let lc = n.to_ascii_lowercase();
lc.ends_with(".htm") || lc.ends_with(".html") || lc.ends_with(".txt")
})?;
Ok(html
.into_iter()
.filter(|p| {
accession_from_path(p)
.and_then(|a| index.get(&a))
.is_some_and(|ft| forms.contains(&ft.as_str()))
})
.collect())
}
pub fn walk_filings_in_index(
workdir: &Workdir,
root: &Path,
ext_predicate: impl Fn(&str) -> bool,
) -> Result<Vec<PathBuf>> {
let index = load_form_index(workdir);
Ok(walk_filings(root, ext_predicate)?
.into_iter()
.filter(|p| accession_from_path(p).is_some_and(|a| index.contains_key(&a)))
.collect())
}
pub fn is_13f_xml(name: &str) -> bool {
if !name.ends_with(".xml") {
return false;
}
name.contains("13f") || name.contains("13F") || name.contains("infotable")
}
pub fn is_exhibit21_name(name: &str) -> bool {
let lc = name.to_ascii_lowercase();
if !(lc.ends_with(".htm") || lc.ends_with(".html") || lc.ends_with(".txt")) {
return false;
}
lc.contains("ex21")
|| lc.contains("exhibit21")
|| lc.contains("ex-21")
|| lc.contains("exhibit-21")
}
pub fn cik_from_filing_path(path: &Path) -> Option<String> {
path.parent()?
.parent()?
.file_name()?
.to_str()
.map(|s| s.to_string())
}
pub fn accession_from_path(path: &Path) -> Option<String> {
let acc_dir = path.parent()?.file_name()?.to_str()?;
Some(insert_accession_dashes(acc_dir))
}
pub fn insert_accession_dashes(no_dashes: &str) -> String {
if no_dashes.len() == 18 && no_dashes.chars().all(|c| c.is_ascii_digit()) {
format!(
"{}-{}-{}",
&no_dashes[..10],
&no_dashes[10..12],
&no_dashes[12..]
)
} else {
no_dashes.to_string()
}
}
pub fn bool_str(b: bool) -> &'static str {
if b {
"1"
} else {
"0"
}
}
pub fn format_float(f: f64) -> String {
if f == 0.0 {
"".to_string()
} else if f.fract() == 0.0 {
format!("{:.0}", f)
} else {
format!("{}", f)
}
}
pub fn opt_int<T: std::fmt::Display>(v: Option<T>) -> String {
match v {
Some(x) => x.to_string(),
None => String::new(),
}
}
pub fn strip_leading_zeros(s: &str) -> String {
let stripped = s.trim_start_matches('0');
if stripped.is_empty() {
"0".to_string()
} else {
stripped.to_string()
}
}
pub fn person_nid_from_cik(cik: &str) -> String {
format!("cik-{cik}")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dashes_inserted_into_18_digit_accession() {
assert_eq!(
insert_accession_dashes("000110465925073753"),
"0001104659-25-073753"
);
}
#[test]
fn dashes_passthrough_for_already_dashed() {
assert_eq!(
insert_accession_dashes("0001104659-25-073753"),
"0001104659-25-073753"
);
}
#[test]
fn strip_zeros_basic() {
assert_eq!(strip_leading_zeros("0000320193"), "320193");
assert_eq!(strip_leading_zeros("320193"), "320193");
assert_eq!(strip_leading_zeros("0"), "0");
assert_eq!(strip_leading_zeros("0000"), "0");
}
#[test]
fn format_float_zero_is_empty() {
assert_eq!(format_float(0.0), "");
}
#[test]
fn format_float_whole_integer_form() {
assert_eq!(format_float(123.0), "123");
}
#[test]
fn format_float_with_decimal() {
assert_eq!(format_float(225.5), "225.5");
}
#[test]
fn is_exhibit21_recognises_variants() {
assert!(is_exhibit21_name("ex-21.htm"));
assert!(is_exhibit21_name("Exhibit21.html"));
assert!(is_exhibit21_name("ex21_a.txt"));
assert!(!is_exhibit21_name("ex-21.pdf"));
}
}