use std::collections::BTreeMap;
use std::path::Path;
use arrow::array::{Array, LargeStringArray, StringArray};
use arrow::compute::cast;
use arrow::datatypes::DataType;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use datasynth_core::distributions::behavioral_priors::{AccountSemantic, CoaSemanticPrior};
use crate::error::{FingerprintError, FingerprintResult};
const ACCOUNT_NUMBER_CANDIDATES: &[&str] = &[
"c",
"account",
"gl_account",
"account_number",
"account_no",
"glaccount",
"gl account",
"saknr",
];
const DESCRIPTION_CANDIDATES: &[&str] = &[
"gl account name",
"gl_account_name",
"description",
"account_name",
"account_description",
"name",
"text",
"txt50",
];
const ACCOUNT_TYPE_CANDIDATES: &[&str] =
&["account type", "account_type", "acct_type", "type", "ktoks"];
const ACCOUNT_CLASS_CANDIDATES: &[&str] = &["account class", "account_class", "class"];
const ACCOUNT_SUB_TYPE_CANDIDATES: &[&str] = &[
"account sub type",
"account_sub_type",
"sub_type",
"subtype",
];
const ACCOUNT_SUB_CLASS_CANDIDATES: &[&str] = &[
"account sub class",
"account_sub_class",
"sub_class",
"subclass",
];
pub fn extract_coa_semantic_from_parquet(path: &Path) -> FingerprintResult<CoaSemanticPrior> {
let file = std::fs::File::open(path).map_err(|e| {
FingerprintError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("COA parquet open failed: {e}"),
))
})?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
FingerprintError::InvalidFormat(format!("COA parquet: cannot build reader: {e}"))
})?;
let reader = builder.build().map_err(|e| {
FingerprintError::InvalidFormat(format!("COA parquet: cannot open reader: {e}"))
})?;
let mut accounts: BTreeMap<String, AccountSemantic> = BTreeMap::new();
for batch_res in reader {
let batch = batch_res.map_err(|e| {
FingerprintError::InvalidFormat(format!("COA parquet: batch read error: {e}"))
})?;
let schema = batch.schema();
let col_names: Vec<String> = schema
.fields()
.iter()
.map(|f| f.name().to_lowercase())
.collect();
let acct_idx = find_column_index(&col_names, ACCOUNT_NUMBER_CANDIDATES);
let desc_idx = find_column_index(&col_names, DESCRIPTION_CANDIDATES);
let type_idx = find_column_index(&col_names, ACCOUNT_TYPE_CANDIDATES);
let class_idx = find_column_index(&col_names, ACCOUNT_CLASS_CANDIDATES);
let sub_type_idx = find_column_index(&col_names, ACCOUNT_SUB_TYPE_CANDIDATES);
let sub_class_idx = find_column_index(&col_names, ACCOUNT_SUB_CLASS_CANDIDATES);
let Some(acct_col) = acct_idx else {
tracing::warn!(
"extract_coa_semantic_from_parquet: no account-number column found in {:?}; columns={:?}",
path,
col_names
);
continue;
};
let n_rows = batch.num_rows();
let acct_arr: Option<StringArray> = string_column(&batch, acct_col);
let desc_arr: Option<StringArray> = desc_idx.and_then(|i| string_column(&batch, i));
let type_arr: Option<StringArray> = type_idx.and_then(|i| string_column(&batch, i));
let class_arr: Option<StringArray> = class_idx.and_then(|i| string_column(&batch, i));
let sub_type_arr: Option<StringArray> = sub_type_idx.and_then(|i| string_column(&batch, i));
let sub_class_arr: Option<StringArray> =
sub_class_idx.and_then(|i| string_column(&batch, i));
for row in 0..n_rows {
let account_number = match &acct_arr {
Some(arr) => {
if arr.is_null(row) {
continue;
}
let v = arr.value(row).trim().to_string();
if v.is_empty() {
continue;
}
v
}
None => continue,
};
if accounts.contains_key(&account_number) {
continue;
}
let description = desc_arr
.as_ref()
.map(|arr| {
if arr.is_null(row) {
String::new()
} else {
arr.value(row).trim().to_string()
}
})
.unwrap_or_default();
let account_type = opt_string(&type_arr, row);
let account_class = opt_string(&class_arr, row);
let account_class_name = opt_string(&sub_type_arr, row);
let account_sub_class = opt_string(&sub_class_arr, row);
let account_sub_class_name = account_sub_class.clone();
accounts.insert(
account_number,
AccountSemantic {
description,
account_type,
account_class,
account_class_name,
account_sub_class,
account_sub_class_name,
parent_account: None,
},
);
}
}
Ok(CoaSemanticPrior { accounts })
}
fn find_column_index(col_names: &[String], candidates: &[&str]) -> Option<usize> {
for &candidate in candidates {
if let Some(idx) = col_names.iter().position(|n| n == candidate) {
return Some(idx);
}
}
None
}
fn string_column(batch: &arrow::record_batch::RecordBatch, col_idx: usize) -> Option<StringArray> {
let col = batch.column(col_idx);
match col.data_type() {
DataType::Utf8 => {
let arr = col.as_any().downcast_ref::<StringArray>()?.clone();
Some(arr)
}
DataType::LargeUtf8 => {
let large = col.as_any().downcast_ref::<LargeStringArray>()?;
let values: Vec<Option<&str>> = (0..large.len())
.map(|i| {
if large.is_null(i) {
None
} else {
Some(large.value(i))
}
})
.collect();
let arr = StringArray::from(values);
Some(arr)
}
DataType::Dictionary(_, _) => {
let utf8 = cast(col.as_ref(), &DataType::Utf8).ok()?;
let arr = utf8.as_any().downcast_ref::<StringArray>()?.clone();
Some(arr)
}
_ => None,
}
}
fn opt_string(col: &Option<StringArray>, row: usize) -> Option<String> {
let arr = col.as_ref()?;
if arr.is_null(row) {
return None;
}
let v = arr.value(row).trim().to_string();
if v.is_empty() {
None
} else {
Some(v)
}
}
#[cfg(test)]
mod tests {
use super::*;
use datasynth_core::distributions::behavioral_priors::CoaSemanticPrior;
#[test]
fn aggregate_coa_semantic_merges_disjoint_accounts() {
use crate::aggregation::industry_aggregator::aggregate_coa_semantic;
let mut a = CoaSemanticPrior::default();
a.accounts.insert(
"1000".to_string(),
AccountSemantic {
description: "Cash".to_string(),
account_type: Some("Assets".to_string()),
..Default::default()
},
);
let mut b = CoaSemanticPrior::default();
b.accounts.insert(
"2000".to_string(),
AccountSemantic {
description: "AP Control".to_string(),
account_type: Some("Liabilities".to_string()),
..Default::default()
},
);
let merged = aggregate_coa_semantic(&[&a, &b]);
assert_eq!(merged.accounts.len(), 2);
assert_eq!(merged.accounts["1000"].description, "Cash");
assert_eq!(merged.accounts["2000"].description, "AP Control");
}
#[test]
fn aggregate_coa_semantic_picks_mode_description() {
use crate::aggregation::industry_aggregator::aggregate_coa_semantic;
let make_prior = |desc: &str| {
let mut p = CoaSemanticPrior::default();
p.accounts.insert(
"1000".to_string(),
AccountSemantic {
description: desc.to_string(),
..Default::default()
},
);
p
};
let a = make_prior("Kasse");
let b = make_prior("Kasse");
let c = make_prior("Petty Cash");
let merged = aggregate_coa_semantic(&[&a, &b, &c]);
assert_eq!(merged.accounts["1000"].description, "Kasse");
}
#[test]
fn aggregate_coa_semantic_preserves_optional_fields() {
use crate::aggregation::industry_aggregator::aggregate_coa_semantic;
let mut a = CoaSemanticPrior::default();
a.accounts.insert(
"3000".to_string(),
AccountSemantic {
description: "Revenue".to_string(),
account_type: Some("Revenue".to_string()),
account_class: Some("R _ Revenue".to_string()),
..Default::default()
},
);
let mut b = CoaSemanticPrior::default();
b.accounts.insert(
"3000".to_string(),
AccountSemantic {
description: "Revenue".to_string(),
..Default::default()
},
);
let merged = aggregate_coa_semantic(&[&a, &b]);
let sem = &merged.accounts["3000"];
assert_eq!(sem.account_type.as_deref(), Some("Revenue"));
assert_eq!(sem.account_class.as_deref(), Some("R _ Revenue"));
}
}