use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::io::BufReader;
use crate::datasets::sec::error::{Result, SecError};
use crate::datasets::sec::layout::Workdir;
use crate::datasets::sec::parsers::submissions::{
iter_submissions_zip, open_submissions_zip, read_submission_by_cik,
};
use crate::datasets::sec::slicing::SliceSpec;
use super::super::sinks::Sinks;
use super::super::util::strip_leading_zeros;
use super::Identities;
#[derive(Debug, Clone, Default)]
pub struct CompanyEmitReport {
pub companies_written: usize,
pub submission_parse_errors: usize,
pub distinct_sic_codes: usize,
pub filings_indexed: usize,
}
pub fn emit_from_submissions(
workdir: &Workdir,
slice: &SliceSpec,
sinks: &mut Sinks,
identities: &mut Identities,
) -> Result<(CompanyEmitReport, HashMap<String, String>)> {
let zip_path = workdir.raw_submissions_zip();
if !zip_path.is_file() {
return Err(SecError::Malformed(format!(
"missing {}; run fetch_submissions_bulk first",
zip_path.display()
)));
}
let mut report = CompanyEmitReport::default();
let mut sic_index: HashMap<String, String> = HashMap::new();
let filing_index_path = workdir.processed_csv("filing_index");
let mut filing_index = csv::WriterBuilder::new()
.quote_style(csv::QuoteStyle::Necessary)
.buffer_capacity(512 * 1024)
.from_path(&filing_index_path)
.map_err(|e| SecError::Malformed(format!("filing_index.csv open: {}", e)))?;
filing_index
.write_record([
"accession_number",
"cik",
"form_type",
"filed_date",
"report_date",
"primary_document",
])
.map_err(|e| SecError::Malformed(format!("filing_index.csv header: {}", e)))?;
let effective_ciks: Option<Vec<u64>> = match &slice.cik_list {
Some(cik_set) => {
let mut v: Vec<u64> = cik_set.iter().copied().collect();
v.sort_unstable(); Some(v)
}
None => {
let derived = discover_corpus_ciks(workdir);
(!derived.is_empty()).then_some(derived)
}
};
if let Some(ciks) = effective_ciks {
let subs_dir = workdir.raw_submissions_dir();
let mut bulk_zip: Option<_> = None;
for cik in ciks {
let individual = subs_dir.join(format!("CIK{cik:010}.json"));
let sub = if individual.is_file() {
match std::fs::read_to_string(&individual) {
Ok(json) => {
match crate::datasets::sec::parsers::submissions::parse_submission_json(
&json,
) {
Ok(s) => Some(s),
Err(_) => {
report.submission_parse_errors += 1;
None
}
}
}
Err(_) => None,
}
} else {
if bulk_zip.is_none() {
let zip_file = File::open(&zip_path).map_err(SecError::Io)?;
bulk_zip = Some(open_submissions_zip(BufReader::new(zip_file))?);
}
match read_submission_by_cik(bulk_zip.as_mut().unwrap(), cik) {
Ok(s) => s,
Err(_) => {
report.submission_parse_errors += 1;
None
}
}
};
if let Some(sub) = sub {
emit_one_submission(
&sub,
slice,
sinks,
identities,
&mut filing_index,
&mut sic_index,
&mut report,
)?;
}
}
} else {
let zip_file = File::open(&zip_path).map_err(SecError::Io)?;
for entry in iter_submissions_zip(BufReader::new(zip_file))? {
match entry {
Ok((_name, sub)) => emit_one_submission(
&sub,
slice,
sinks,
identities,
&mut filing_index,
&mut sic_index,
&mut report,
)?,
Err(_) => report.submission_parse_errors += 1,
}
}
}
filing_index.flush().map_err(SecError::Io)?;
report.distinct_sic_codes = sic_index.len();
Ok((report, sic_index))
}
#[allow(clippy::too_many_arguments)]
fn emit_one_submission(
sub: &crate::datasets::sec::parsers::submissions::Submission,
slice: &SliceSpec,
sinks: &mut Sinks,
identities: &mut Identities,
filing_index: &mut csv::Writer<File>,
sic_index: &mut HashMap<String, String>,
report: &mut CompanyEmitReport,
) -> Result<()> {
if sub.company.cik.is_empty() || sub.company.name.is_empty() {
return Ok(());
}
let cik_int: u64 = sub.company.cik.parse().unwrap_or(0);
if !slice.cik_matches(cik_int) {
return Ok(());
}
let cik = strip_leading_zeros(&sub.company.cik);
identities.ensure_company(
sinks,
cik.as_str(),
sub.company.name.as_str(),
sub.company.sic.as_str(),
sub.company.sic_description.as_str(),
sub.company.state_of_incorporation.as_str(),
sub.company.fiscal_year_end.as_str(),
&sub.company.tickers.join("; "),
&sub.company.exchanges.join("; "),
sub.company.entity_type.as_str(),
sub.company.former_names.as_str(),
)?;
report.companies_written += 1;
if !sub.company.sic.is_empty() {
sic_index
.entry(sub.company.sic.clone())
.or_insert_with(|| sub.company.sic_description.clone());
}
let empty = String::new();
for i in 0..sub.filings.accession_number.len() {
let accession = &sub.filings.accession_number[i];
if accession.is_empty() {
continue;
}
let form = sub.filings.form.get(i).unwrap_or(&empty);
let filed = sub.filings.filing_date.get(i).unwrap_or(&empty);
if !slice.form_matches(form) || !slice.date_matches(filed) {
continue;
}
let report_date = sub.filings.report_date.get(i).unwrap_or(&empty);
let primary = sub.filings.primary_document.get(i).unwrap_or(&empty);
filing_index
.write_record([
accession.as_str(),
cik.as_str(),
form.as_str(),
filed.as_str(),
report_date.as_str(),
primary.as_str(),
])
.map_err(|e| SecError::Malformed(format!("filing_index.csv row: {}", e)))?;
report.filings_indexed += 1;
}
Ok(())
}
fn discover_corpus_ciks(workdir: &Workdir) -> Vec<u64> {
let mut ciks: BTreeSet<u64> = BTreeSet::new();
if let Ok(entries) = std::fs::read_dir(workdir.raw_filings_dir()) {
for entry in entries.flatten() {
if !entry.path().is_dir() {
continue;
}
let fname = entry.file_name();
if let Some(cik) = fname.to_str().and_then(|n| n.parse::<u64>().ok()) {
ciks.insert(cik);
}
}
}
if let Ok(entries) = std::fs::read_dir(workdir.raw_financials_dir()) {
for entry in entries.flatten() {
let fname = entry.file_name();
if let Some(cik) = fname.to_str().and_then(parse_company_facts_cik) {
ciks.insert(cik);
}
}
}
ciks.into_iter().collect()
}
fn parse_company_facts_cik(name: &str) -> Option<u64> {
name.strip_prefix("companyfacts_CIK")?
.strip_suffix(".json")?
.parse::<u64>()
.ok()
}
pub fn emit_sic_index(workdir: &Workdir, sic_index: &HashMap<String, String>) -> Result<()> {
let path = workdir.processed_csv("sic");
let mut w = csv::WriterBuilder::new()
.quote_style(csv::QuoteStyle::Necessary)
.from_path(&path)
.map_err(|e| SecError::Malformed(format!("sic.csv open: {}", e)))?;
w.write_record(["sic", "description"])
.map_err(|e| SecError::Malformed(format!("sic.csv header: {}", e)))?;
let mut entries: Vec<(&String, &String)> = sic_index.iter().collect();
entries.sort_by(|a, b| a.0.cmp(b.0));
for (sic, desc) in entries {
w.write_record([sic.as_str(), desc.as_str()])
.map_err(|e| SecError::Malformed(format!("sic.csv row: {}", e)))?;
}
w.flush().map_err(SecError::Io)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn tempdir() -> std::path::PathBuf {
let dir = std::env::temp_dir().join(format!(
"kglite-sec-companies-test-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
std::fs::create_dir_all(&dir).unwrap();
dir
}
#[test]
fn discover_corpus_ciks_unions_filings_and_financials() {
let tmp = tempdir();
let wd = Workdir::new(&tmp);
wd.ensure_dirs(None).unwrap();
for cik in ["320193", "1318605", "789019"] {
std::fs::create_dir_all(wd.raw_filings_dir().join(cik)).unwrap();
}
std::fs::create_dir_all(wd.raw_filings_dir().join("not-a-cik")).unwrap();
for name in [
"companyfacts_CIK0000051143.json",
"companyfacts_CIK0000320193.json",
] {
std::fs::write(wd.raw_financials_dir().join(name), b"{}").unwrap();
}
std::fs::write(wd.raw_financials_dir().join("README.txt"), b"").unwrap();
assert_eq!(
discover_corpus_ciks(&wd),
vec![51143, 320193, 789019, 1318605]
);
std::fs::remove_dir_all(&tmp).ok();
}
#[test]
fn discover_corpus_ciks_empty_when_no_raw_tree() {
let tmp = tempdir();
let wd = Workdir::new(&tmp);
assert!(discover_corpus_ciks(&wd).is_empty());
std::fs::remove_dir_all(&tmp).ok();
}
#[test]
fn parse_company_facts_cik_handles_variants() {
assert_eq!(
parse_company_facts_cik("companyfacts_CIK0000320193.json"),
Some(320193)
);
assert_eq!(parse_company_facts_cik("README.txt"), None);
assert_eq!(parse_company_facts_cik("companyfacts_CIK.json"), None);
assert_eq!(parse_company_facts_cik("submissions.zip"), None);
}
}