use std::collections::{BTreeMap, HashSet};
use std::path::Path;
use arrow::array::Array;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use datasynth_core::distributions::behavioral_priors::ApproverPrior;
use super::tb_extractor::{find_column_index, float64_column, string_column};
use crate::error::{FingerprintError, FingerprintResult};
pub const DEFAULT_MIN_APPROVER_OBSERVATIONS: usize = 100;
const ENTERER_CANDIDATES: &[&str] = &["enterby", "enter_by", "enteredby", "usnam", "created_by"];
const APPROVER_CANDIDATES: &[&str] = &[
"approverid",
"approver_id",
"approver",
"approvedby",
"approved_by",
];
const SOURCE_CANDIDATES: &[&str] = &["source", "blart", "doctype"];
const JE_NUMBER_CANDIDATES: &[&str] = &["je number", "je_number", "document_number", "doc_number"];
const AMOUNT_CANDIDATES: &[&str] = &["functional amount", "functional_amount", "amount"];
pub fn extract_approver_prior_from_parquet(
path: &Path,
min_observations_per_source: usize,
) -> FingerprintResult<Option<ApproverPrior>> {
let file = std::fs::File::open(path).map_err(|e| {
FingerprintError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("JE parquet open failed: {e}"),
))
})?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
FingerprintError::InvalidFormat(format!("JE parquet: cannot build reader: {e}"))
})?;
let col_names: Vec<String> = builder
.schema()
.fields()
.iter()
.map(|f| f.name().to_lowercase())
.collect();
let Some(enterer_idx) = find_column_index(&col_names, ENTERER_CANDIDATES) else {
return Ok(None);
};
let Some(approver_idx) = find_column_index(&col_names, APPROVER_CANDIDATES) else {
return Ok(None);
};
let source_idx = find_column_index(&col_names, SOURCE_CANDIDATES);
let je_idx = find_column_index(&col_names, JE_NUMBER_CANDIDATES);
let amount_idx = find_column_index(&col_names, AMOUNT_CANDIDATES);
let reader = builder.build().map_err(|e| {
FingerprintError::InvalidFormat(format!("JE parquet: cannot open reader: {e}"))
})?;
let mut total = 0usize;
let mut approved = 0usize;
let mut self_approved = 0usize;
let mut per_source: BTreeMap<String, (usize, usize)> = BTreeMap::new();
let mut seen_jes: HashSet<String> = HashSet::new();
for batch_res in reader {
let batch = batch_res.map_err(|e| {
FingerprintError::InvalidFormat(format!("JE parquet: batch read error: {e}"))
})?;
let Some(enterer_arr) = string_column(&batch, enterer_idx) else {
continue;
};
let Some(approver_arr) = string_column(&batch, approver_idx) else {
continue;
};
let source_arr = source_idx.and_then(|i| string_column(&batch, i));
let je_arr = je_idx.and_then(|i| string_column(&batch, i));
let amount_vals = amount_idx.map(|i| float64_column(&batch, i));
for row in 0..batch.num_rows() {
if enterer_arr.is_null(row) {
continue;
}
let enterer = enterer_arr.value(row).trim();
if enterer.is_empty() {
continue;
}
if let Some(amts) = &amount_vals {
match amts[row] {
Some(v) if v != 0.0 => {}
_ => continue,
}
}
if let Some(jes) = &je_arr {
if !jes.is_null(row) {
let je = jes.value(row).trim();
if !je.is_empty() && !seen_jes.insert(je.to_string()) {
continue;
}
}
}
total += 1;
let approver = if approver_arr.is_null(row) {
""
} else {
approver_arr.value(row).trim()
};
let has_approver = !approver.is_empty();
if has_approver {
approved += 1;
if approver.eq_ignore_ascii_case(enterer) {
self_approved += 1;
}
}
if let Some(sources) = &source_arr {
if !sources.is_null(row) {
let source = sources.value(row).trim();
if !source.is_empty() {
let entry = per_source.entry(source.to_string()).or_insert((0, 0));
entry.0 += 1;
if has_approver {
entry.1 += 1;
}
}
}
}
}
}
if total == 0 {
return Ok(None);
}
let by_source: BTreeMap<String, f64> = per_source
.into_iter()
.filter(|(_, (n, _))| *n >= min_observations_per_source)
.map(|(s, (n, a))| (s, a as f64 / n as f64))
.collect();
Ok(Some(ApproverPrior {
approval_share: approved as f64 / total as f64,
self_approval_rate: if approved > 0 {
self_approved as f64 / approved as f64
} else {
0.0
},
by_source,
n_observations: total,
}))
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use std::fs::File;
use std::sync::Arc;
fn write_parquet(path: &Path, rows: &[(&str, &str, &str)]) {
let schema = Arc::new(Schema::new(vec![
Field::new("Source", DataType::Utf8, true),
Field::new("EnterBy", DataType::Utf8, true),
Field::new("ApproverID", DataType::Utf8, true),
]));
let s: Vec<Option<&str>> = rows.iter().map(|(a, _, _)| Some(*a)).collect();
let e: Vec<Option<&str>> = rows.iter().map(|(_, b, _)| Some(*b)).collect();
let a: Vec<Option<&str>> = rows
.iter()
.map(|(_, _, c)| if c.is_empty() { None } else { Some(*c) })
.collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(s)),
Arc::new(StringArray::from(e)),
Arc::new(StringArray::from(a)),
],
)
.expect("batch");
let file = File::create(path).expect("create");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("writer");
writer.write(&batch).expect("write");
writer.close().expect("close");
}
#[test]
fn approver_prior_extracted_from_known_book() {
let tmp = tempfile::tempdir().expect("tmpdir");
let path = tmp.path().join("je.parquet");
let rows = vec![
("SA", "u1", "m1"),
("SA", "u1", "u1"), ("SA", "u2", "m1"),
("SA", "u2", "U2"), ("RE", "u3", "m2"),
("RE", "u3", "m2"),
("RE", "u4", ""),
("RE", "u4", ""),
];
write_parquet(&path, &rows);
let ap = extract_approver_prior_from_parquet(&path, 1)
.expect("extract")
.expect("Some");
assert_eq!(ap.n_observations, 8);
assert!((ap.approval_share - 0.75).abs() < 1e-9);
assert!((ap.self_approval_rate - 2.0 / 6.0).abs() < 1e-9);
assert!((ap.by_source["SA"] - 1.0).abs() < 1e-9);
assert!((ap.by_source["RE"] - 0.5).abs() < 1e-9);
}
#[test]
fn je_number_column_switches_approver_shares_to_per_je() {
let tmp = tempfile::tempdir().expect("tmpdir");
let path = tmp.path().join("je.parquet");
let schema = Arc::new(Schema::new(vec![
Field::new("JE Number", DataType::Utf8, true),
Field::new("Source", DataType::Utf8, true),
Field::new("EnterBy", DataType::Utf8, true),
Field::new("ApproverID", DataType::Utf8, true),
]));
let rows: Vec<(&str, &str, &str, &str)> = vec![
("2024/J1", "SA", "u1", "m1"),
("2024/J1", "SA", "u1", "m1"),
("2024/J1", "SA", "u1", "m1"),
("2024/J2", "SA", "u2", ""),
];
let je: Vec<Option<&str>> = rows.iter().map(|(j, _, _, _)| Some(*j)).collect();
let src: Vec<Option<&str>> = rows.iter().map(|(_, s, _, _)| Some(*s)).collect();
let ent: Vec<Option<&str>> = rows.iter().map(|(_, _, e, _)| Some(*e)).collect();
let app: Vec<Option<&str>> = rows
.iter()
.map(|(_, _, _, a)| if a.is_empty() { None } else { Some(*a) })
.collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(je)),
Arc::new(StringArray::from(src)),
Arc::new(StringArray::from(ent)),
Arc::new(StringArray::from(app)),
],
)
.expect("batch");
let file = File::create(&path).expect("create");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("writer");
writer.write(&batch).expect("write");
writer.close().expect("close");
let ap = extract_approver_prior_from_parquet(&path, 1)
.expect("extract")
.expect("Some");
assert_eq!(ap.n_observations, 2, "JEs counted, not lines");
assert!(
(ap.approval_share - 0.5).abs() < 1e-12,
"per-JE 0.5, not line-level 0.75"
);
assert!((ap.by_source["SA"] - 0.5).abs() < 1e-12);
}
#[test]
fn zero_amount_jes_excluded_when_amount_column_present() {
use arrow::array::Float64Array;
let tmp = tempfile::tempdir().expect("tmpdir");
let path = tmp.path().join("je.parquet");
let schema = Arc::new(Schema::new(vec![
Field::new("JE Number", DataType::Utf8, true),
Field::new("Source", DataType::Utf8, true),
Field::new("EnterBy", DataType::Utf8, true),
Field::new("ApproverID", DataType::Utf8, true),
Field::new("Functional Amount", DataType::Float64, true),
]));
let rows: Vec<(&str, &str, &str, &str, f64)> = vec![
("2024/J1", "SA", "u1", "m1", 0.0),
("2024/J2", "SA", "u2", "m1", 10.0),
("2024/J3", "SA", "u3", "", 5.0),
];
let je: Vec<Option<&str>> = rows.iter().map(|(j, _, _, _, _)| Some(*j)).collect();
let src: Vec<Option<&str>> = rows.iter().map(|(_, s, _, _, _)| Some(*s)).collect();
let ent: Vec<Option<&str>> = rows.iter().map(|(_, _, e, _, _)| Some(*e)).collect();
let app: Vec<Option<&str>> = rows
.iter()
.map(|(_, _, _, a, _)| if a.is_empty() { None } else { Some(*a) })
.collect();
let amt: Vec<Option<f64>> = rows.iter().map(|(_, _, _, _, v)| Some(*v)).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(je)),
Arc::new(StringArray::from(src)),
Arc::new(StringArray::from(ent)),
Arc::new(StringArray::from(app)),
Arc::new(Float64Array::from(amt)),
],
)
.expect("batch");
let file = File::create(&path).expect("create");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("writer");
writer.write(&batch).expect("write");
writer.close().expect("close");
let ap = extract_approver_prior_from_parquet(&path, 1)
.expect("extract")
.expect("Some");
assert_eq!(ap.n_observations, 2, "zero-amount JE must not count");
assert!((ap.approval_share - 0.5).abs() < 1e-12);
}
#[test]
fn approver_prior_edges() {
let tmp = tempfile::tempdir().expect("tmpdir");
let path = tmp.path().join("je2.parquet");
let rows = vec![("SA", "u1", "m1"), ("SA", "u2", ""), ("RE", "u3", "m2")];
write_parquet(&path, &rows);
let ap = extract_approver_prior_from_parquet(&path, 100)
.expect("extract")
.expect("Some");
assert!(ap.by_source.is_empty());
assert!((ap.approval_share - 2.0 / 3.0).abs() < 1e-9);
let path2 = tmp.path().join("je3.parquet");
let schema = Arc::new(Schema::new(vec![
Field::new("Source", DataType::Utf8, true),
Field::new("EnterBy", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec![Some("SA")])),
Arc::new(StringArray::from(vec![Some("u1")])),
],
)
.expect("batch");
let file = File::create(&path2).expect("create");
let mut w = ArrowWriter::try_new(file, schema, None).expect("writer");
w.write(&batch).expect("write");
w.close().expect("close");
assert!(extract_approver_prior_from_parquet(&path2, 1)
.expect("extract")
.is_none());
}
}