datasynth-fingerprint 5.36.0

Privacy-preserving synthetic data fingerprinting for DataSynth
Documentation
//! Phase-2 R9 — preparer/approver prior extraction (GH #217 §D.2).
//!
//! Corpus GLs carry enterer and approver columns; neither is part of the eval
//! loader's `Record`, so — like the manual-share and TB extractors — this reads
//! the columns directly from the JE parquet as a side channel. The prior feeds
//! the generator's approver emission (preparer≠approver model) so SoD /
//! approval-chain detector arms have real signal on twins.

use std::collections::{BTreeMap, HashSet};
use std::path::Path;

use arrow::array::Array;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

use datasynth_core::distributions::behavioral_priors::ApproverPrior;

use super::tb_extractor::{find_column_index, float64_column, string_column};
use crate::error::{FingerprintError, FingerprintResult};

/// Minimum enterer-bearing rows for a source to receive its own approval share.
pub const DEFAULT_MIN_APPROVER_OBSERVATIONS: usize = 100;

/// Candidates for the enterer/preparer column (lower-cased, exact).
const ENTERER_CANDIDATES: &[&str] = &["enterby", "enter_by", "enteredby", "usnam", "created_by"];

/// Candidates for the approver column (lower-cased, exact).
const APPROVER_CANDIDATES: &[&str] = &[
    "approverid",
    "approver_id",
    "approver",
    "approvedby",
    "approved_by",
];

/// Candidates for the source column (lower-cased, exact).
const SOURCE_CANDIDATES: &[&str] = &["source", "blart", "doctype"];

/// Candidates for the JE-number column (lower-cased, exact). When present,
/// counting is **per JE** (first row per JE number) — the generator draws the
/// approver once per JE. Year-scoped raw document numbers (`belnr`) are
/// deliberately excluded to avoid merging distinct JEs across fiscal years.
const JE_NUMBER_CANDIDATES: &[&str] = &["je number", "je_number", "document_number", "doc_number"];

/// Candidates for the functional-amount column (lower-cased, exact). When
/// present, zero/null-amount rows do not count — same population as
/// `source_mix_je` and the manual-share prior.
const AMOUNT_CANDIDATES: &[&str] = &["functional amount", "functional_amount", "amount"];

/// Extract the approver prior from a JE parquet file.
///
/// Denominator: observations with a non-empty enterer — one per JE when a
/// JE-number column is present (first amount-bearing row wins;
/// enterer/approver are header-level in practice), else one per line. When an
/// amount column is present, zero-amount rows (and JEs with only zero-amount
/// rows) are excluded. `approval_share` is the share of those observations
/// carrying a non-empty approver; `self_approval_rate` is — among approved
/// observations — the share where approver == enterer (trimmed,
/// case-insensitive). Per-source approval shares are emitted for sources with
/// at least `min_observations_per_source` observations. `Ok(None)` when either
/// column is missing or no observation qualifies.
pub fn extract_approver_prior_from_parquet(
    path: &Path,
    min_observations_per_source: usize,
) -> FingerprintResult<Option<ApproverPrior>> {
    let file = std::fs::File::open(path).map_err(|e| {
        FingerprintError::Io(std::io::Error::new(
            std::io::ErrorKind::NotFound,
            format!("JE parquet open failed: {e}"),
        ))
    })?;

    let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
        FingerprintError::InvalidFormat(format!("JE parquet: cannot build reader: {e}"))
    })?;

    let col_names: Vec<String> = builder
        .schema()
        .fields()
        .iter()
        .map(|f| f.name().to_lowercase())
        .collect();

    let Some(enterer_idx) = find_column_index(&col_names, ENTERER_CANDIDATES) else {
        return Ok(None);
    };
    let Some(approver_idx) = find_column_index(&col_names, APPROVER_CANDIDATES) else {
        return Ok(None);
    };
    let source_idx = find_column_index(&col_names, SOURCE_CANDIDATES);
    let je_idx = find_column_index(&col_names, JE_NUMBER_CANDIDATES);
    let amount_idx = find_column_index(&col_names, AMOUNT_CANDIDATES);

    let reader = builder.build().map_err(|e| {
        FingerprintError::InvalidFormat(format!("JE parquet: cannot open reader: {e}"))
    })?;

    let mut total = 0usize;
    let mut approved = 0usize;
    let mut self_approved = 0usize;
    // source -> (enterer observations, approved observations)
    let mut per_source: BTreeMap<String, (usize, usize)> = BTreeMap::new();
    // JE numbers already counted (per-JE mode only).
    let mut seen_jes: HashSet<String> = HashSet::new();

    for batch_res in reader {
        let batch = batch_res.map_err(|e| {
            FingerprintError::InvalidFormat(format!("JE parquet: batch read error: {e}"))
        })?;

        let Some(enterer_arr) = string_column(&batch, enterer_idx) else {
            continue;
        };
        let Some(approver_arr) = string_column(&batch, approver_idx) else {
            continue;
        };
        let source_arr = source_idx.and_then(|i| string_column(&batch, i));
        let je_arr = je_idx.and_then(|i| string_column(&batch, i));
        let amount_vals = amount_idx.map(|i| float64_column(&batch, i));

        for row in 0..batch.num_rows() {
            if enterer_arr.is_null(row) {
                continue;
            }
            let enterer = enterer_arr.value(row).trim();
            if enterer.is_empty() {
                continue;
            }
            // Amount-bearing filter: zero/null-amount rows never count.
            if let Some(amts) = &amount_vals {
                match amts[row] {
                    Some(v) if v != 0.0 => {}
                    _ => continue,
                }
            }
            // Per-JE mode: only the first (amount-bearing) row of each JE counts.
            if let Some(jes) = &je_arr {
                if !jes.is_null(row) {
                    let je = jes.value(row).trim();
                    if !je.is_empty() && !seen_jes.insert(je.to_string()) {
                        continue;
                    }
                }
            }
            total += 1;
            let approver = if approver_arr.is_null(row) {
                ""
            } else {
                approver_arr.value(row).trim()
            };
            let has_approver = !approver.is_empty();
            if has_approver {
                approved += 1;
                if approver.eq_ignore_ascii_case(enterer) {
                    self_approved += 1;
                }
            }

            if let Some(sources) = &source_arr {
                if !sources.is_null(row) {
                    let source = sources.value(row).trim();
                    if !source.is_empty() {
                        let entry = per_source.entry(source.to_string()).or_insert((0, 0));
                        entry.0 += 1;
                        if has_approver {
                            entry.1 += 1;
                        }
                    }
                }
            }
        }
    }

    if total == 0 {
        return Ok(None);
    }

    let by_source: BTreeMap<String, f64> = per_source
        .into_iter()
        .filter(|(_, (n, _))| *n >= min_observations_per_source)
        .map(|(s, (n, a))| (s, a as f64 / n as f64))
        .collect();

    Ok(Some(ApproverPrior {
        approval_share: approved as f64 / total as f64,
        self_approval_rate: if approved > 0 {
            self_approved as f64 / approved as f64
        } else {
            0.0
        },
        by_source,
        n_observations: total,
    }))
}

#[cfg(test)]
mod tests {
    use super::*;
    use arrow::array::StringArray;
    use arrow::datatypes::{DataType, Field, Schema};
    use arrow::record_batch::RecordBatch;
    use parquet::arrow::ArrowWriter;
    use std::fs::File;
    use std::sync::Arc;

    /// Write a (Source, EnterBy, ApproverID) parquet file.
    fn write_parquet(path: &Path, rows: &[(&str, &str, &str)]) {
        let schema = Arc::new(Schema::new(vec![
            Field::new("Source", DataType::Utf8, true),
            Field::new("EnterBy", DataType::Utf8, true),
            Field::new("ApproverID", DataType::Utf8, true),
        ]));
        let s: Vec<Option<&str>> = rows.iter().map(|(a, _, _)| Some(*a)).collect();
        let e: Vec<Option<&str>> = rows.iter().map(|(_, b, _)| Some(*b)).collect();
        let a: Vec<Option<&str>> = rows
            .iter()
            .map(|(_, _, c)| if c.is_empty() { None } else { Some(*c) })
            .collect();
        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(StringArray::from(s)),
                Arc::new(StringArray::from(e)),
                Arc::new(StringArray::from(a)),
            ],
        )
        .expect("batch");
        let file = File::create(path).expect("create");
        let mut writer = ArrowWriter::try_new(file, schema, None).expect("writer");
        writer.write(&batch).expect("write");
        writer.close().expect("close");
    }

    /// Approval share, self-approval rate, and per-source shares from a known book:
    /// 8 rows, 6 approved (share 0.75), of which 2 self-approved (rate 1/3);
    /// SA 4/4 approved, RE 2/4 approved.
    #[test]
    fn approver_prior_extracted_from_known_book() {
        let tmp = tempfile::tempdir().expect("tmpdir");
        let path = tmp.path().join("je.parquet");
        let rows = vec![
            ("SA", "u1", "m1"),
            ("SA", "u1", "u1"), // self
            ("SA", "u2", "m1"),
            ("SA", "u2", "U2"), // self (case-insensitive)
            ("RE", "u3", "m2"),
            ("RE", "u3", "m2"),
            ("RE", "u4", ""),
            ("RE", "u4", ""),
        ];
        write_parquet(&path, &rows);

        let ap = extract_approver_prior_from_parquet(&path, 1)
            .expect("extract")
            .expect("Some");
        assert_eq!(ap.n_observations, 8);
        assert!((ap.approval_share - 0.75).abs() < 1e-9);
        assert!((ap.self_approval_rate - 2.0 / 6.0).abs() < 1e-9);
        assert!((ap.by_source["SA"] - 1.0).abs() < 1e-9);
        assert!((ap.by_source["RE"] - 0.5).abs() < 1e-9);
    }

    /// With a JE-number column present, shares are per-JE (first row per JE),
    /// matching the generator's one-approver-draw-per-JE.
    #[test]
    fn je_number_column_switches_approver_shares_to_per_je() {
        let tmp = tempfile::tempdir().expect("tmpdir");
        let path = tmp.path().join("je.parquet");
        let schema = Arc::new(Schema::new(vec![
            Field::new("JE Number", DataType::Utf8, true),
            Field::new("Source", DataType::Utf8, true),
            Field::new("EnterBy", DataType::Utf8, true),
            Field::new("ApproverID", DataType::Utf8, true),
        ]));
        // JE1 approved (3 lines), JE2 unapproved (1 line) -> per-JE 0.5, per-line 0.75.
        let rows: Vec<(&str, &str, &str, &str)> = vec![
            ("2024/J1", "SA", "u1", "m1"),
            ("2024/J1", "SA", "u1", "m1"),
            ("2024/J1", "SA", "u1", "m1"),
            ("2024/J2", "SA", "u2", ""),
        ];
        let je: Vec<Option<&str>> = rows.iter().map(|(j, _, _, _)| Some(*j)).collect();
        let src: Vec<Option<&str>> = rows.iter().map(|(_, s, _, _)| Some(*s)).collect();
        let ent: Vec<Option<&str>> = rows.iter().map(|(_, _, e, _)| Some(*e)).collect();
        let app: Vec<Option<&str>> = rows
            .iter()
            .map(|(_, _, _, a)| if a.is_empty() { None } else { Some(*a) })
            .collect();
        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(StringArray::from(je)),
                Arc::new(StringArray::from(src)),
                Arc::new(StringArray::from(ent)),
                Arc::new(StringArray::from(app)),
            ],
        )
        .expect("batch");
        let file = File::create(&path).expect("create");
        let mut writer = ArrowWriter::try_new(file, schema, None).expect("writer");
        writer.write(&batch).expect("write");
        writer.close().expect("close");

        let ap = extract_approver_prior_from_parquet(&path, 1)
            .expect("extract")
            .expect("Some");
        assert_eq!(ap.n_observations, 2, "JEs counted, not lines");
        assert!(
            (ap.approval_share - 0.5).abs() < 1e-12,
            "per-JE 0.5, not line-level 0.75"
        );
        assert!((ap.by_source["SA"] - 0.5).abs() < 1e-12);
    }

    /// With an amount column present, only amount-bearing JEs count — same
    /// population as `source_mix_je` and the manual-share prior.
    #[test]
    fn zero_amount_jes_excluded_when_amount_column_present() {
        use arrow::array::Float64Array;
        let tmp = tempfile::tempdir().expect("tmpdir");
        let path = tmp.path().join("je.parquet");
        let schema = Arc::new(Schema::new(vec![
            Field::new("JE Number", DataType::Utf8, true),
            Field::new("Source", DataType::Utf8, true),
            Field::new("EnterBy", DataType::Utf8, true),
            Field::new("ApproverID", DataType::Utf8, true),
            Field::new("Functional Amount", DataType::Float64, true),
        ]));
        // JE1 approved but zero-amount only -> excluded.
        // JE2 approved, 10.0 -> counts. JE3 unapproved, 5.0 -> counts.
        let rows: Vec<(&str, &str, &str, &str, f64)> = vec![
            ("2024/J1", "SA", "u1", "m1", 0.0),
            ("2024/J2", "SA", "u2", "m1", 10.0),
            ("2024/J3", "SA", "u3", "", 5.0),
        ];
        let je: Vec<Option<&str>> = rows.iter().map(|(j, _, _, _, _)| Some(*j)).collect();
        let src: Vec<Option<&str>> = rows.iter().map(|(_, s, _, _, _)| Some(*s)).collect();
        let ent: Vec<Option<&str>> = rows.iter().map(|(_, _, e, _, _)| Some(*e)).collect();
        let app: Vec<Option<&str>> = rows
            .iter()
            .map(|(_, _, _, a, _)| if a.is_empty() { None } else { Some(*a) })
            .collect();
        let amt: Vec<Option<f64>> = rows.iter().map(|(_, _, _, _, v)| Some(*v)).collect();
        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(StringArray::from(je)),
                Arc::new(StringArray::from(src)),
                Arc::new(StringArray::from(ent)),
                Arc::new(StringArray::from(app)),
                Arc::new(Float64Array::from(amt)),
            ],
        )
        .expect("batch");
        let file = File::create(&path).expect("create");
        let mut writer = ArrowWriter::try_new(file, schema, None).expect("writer");
        writer.write(&batch).expect("write");
        writer.close().expect("close");

        let ap = extract_approver_prior_from_parquet(&path, 1)
            .expect("extract")
            .expect("Some");
        assert_eq!(ap.n_observations, 2, "zero-amount JE must not count");
        assert!((ap.approval_share - 0.5).abs() < 1e-12);
    }

    /// Missing approver column → Ok(None); per-source gate respected.
    #[test]
    fn approver_prior_edges() {
        let tmp = tempfile::tempdir().expect("tmpdir");
        let path = tmp.path().join("je2.parquet");
        // High gate: per-source shares suppressed, overall still present.
        let rows = vec![("SA", "u1", "m1"), ("SA", "u2", ""), ("RE", "u3", "m2")];
        write_parquet(&path, &rows);
        let ap = extract_approver_prior_from_parquet(&path, 100)
            .expect("extract")
            .expect("Some");
        assert!(ap.by_source.is_empty());
        assert!((ap.approval_share - 2.0 / 3.0).abs() < 1e-9);

        // No approver column at all → None.
        let path2 = tmp.path().join("je3.parquet");
        let schema = Arc::new(Schema::new(vec![
            Field::new("Source", DataType::Utf8, true),
            Field::new("EnterBy", DataType::Utf8, true),
        ]));
        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(StringArray::from(vec![Some("SA")])),
                Arc::new(StringArray::from(vec![Some("u1")])),
            ],
        )
        .expect("batch");
        let file = File::create(&path2).expect("create");
        let mut w = ArrowWriter::try_new(file, schema, None).expect("writer");
        w.write(&batch).expect("write");
        w.close().expect("close");
        assert!(extract_approver_prior_from_parquet(&path2, 1)
            .expect("extract")
            .is_none());
    }
}