datasynth-generators 5.34.0

50+ data generators covering GL, P2P, O2C, S2C, HR, manufacturing, audit, tax, treasury, and ESG
Documentation
//! Persistent fraud-campaign planner (engine feature A1).
//!
//! The default anomaly DGP draws fraud targets independently each period, so cross-period account
//! *continuity* is absent and temporal / relational / memory detectors have nothing to grip
//! (FINDINGS §33). A **campaign** restructures a handful of existing journal entries per period into
//! a counterparty-pinned, relocation-structured scheme: a fixed beneficiary (counterparty) account
//! `C` persists across the whole campaign — the relocation-invariant handle (FINDINGS §36/§40) —
//! while the *booking leg* `B_t` rotates from a pool every `rotate_every_periods`. This is exactly
//! the adversary the self-play work studied, now generation-native.
//!
//! The planner operates on a `&mut [JournalEntry]` slice (it restructures existing entries rather
//! than growing the set): it rewrites one debit line's account to the period's booking leg and one
//! credit line's account to the pinned counterparty, preserving the entry's balance, then flags it
//! `is_fraud` and emits a [`LabeledAnomaly`] tagged with the campaign id, period, counterparty, and
//! booking leg. Accounts are drawn from those already present in the data, so they stay valid.
//!
//! Deterministic given the seed. Off unless [`FraudCampaignConfig::is_active`].

use datasynth_config::schema::{CarryForwardConfig, FraudCampaignConfig};
use datasynth_core::models::{
    AnomalyType, FraudType, JournalEntry, LabeledAnomaly, ObservabilityClass,
};
use rand::seq::SliceRandom;
use rand::{Rng, RngExt};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};

/// Plans persistent fraud campaigns over a batch of journal entries.
pub struct FraudCampaignPlanner<'a> {
    config: &'a FraudCampaignConfig,
}

impl<'a> FraudCampaignPlanner<'a> {
    /// New planner for the given campaign config.
    pub fn new(config: &'a FraudCampaignConfig) -> Self {
        Self { config }
    }

    /// Restructure entries into the configured campaigns and return the campaign labels. A no-op
    /// (empty result, entries untouched) when the config is inactive or there is insufficient
    /// material (too few accounts to form a counterparty + booking-leg pool, or no entries).
    pub fn plan<R: Rng>(&self, entries: &mut [JournalEntry], rng: &mut R) -> Vec<LabeledAnomaly> {
        if !self.config.is_active() || entries.is_empty() {
            return Vec::new();
        }

        // Distinct accounts present in the data — campaigns route through these so they stay valid.
        let mut accounts: Vec<String> = {
            let mut set: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
            for e in entries.iter() {
                for l in &e.lines {
                    set.insert(l.gl_account.clone());
                }
            }
            set.into_iter().collect()
        };
        // Need at least one counterparty + one booking leg, distinct.
        let pool_size = (self.config.booking_leg_pool as usize).max(1);
        if accounts.len() < 2 {
            return Vec::new();
        }

        // Bucket entry indices by period = floor((date - min_date) / period_days). A stable epoch
        // (the earliest posting date) keeps period 0 well-defined regardless of the run's start.
        let min_date = entries
            .iter()
            .map(|e| e.posting_date())
            .min()
            .expect("entries non-empty");
        let period_days = (self.config.period_days as i64).max(1);
        let mut by_period: BTreeMap<i64, Vec<usize>> = BTreeMap::new();
        for (i, e) in entries.iter().enumerate() {
            let period = (e.posting_date() - min_date).num_days() / period_days;
            by_period.entry(period).or_default().push(i);
        }

        let mut labels = Vec::new();
        let n_campaigns = (self.config.count as usize).max(1);
        for campaign_idx in 0..n_campaigns {
            // Pin a distinct counterparty per campaign (rotate the choice deterministically), and a
            // disjoint-ish booking-leg pool drawn from the remaining accounts.
            let cp_pos = rng.random_range(0..accounts.len());
            let counterparty = accounts[cp_pos].clone();
            let booking_pool: Vec<String> = accounts
                .iter()
                .filter(|a| **a != counterparty)
                .take(pool_size)
                .cloned()
                .collect();
            if booking_pool.is_empty() {
                continue;
            }
            let campaign_id = format!("CAMP-{campaign_idx:04}");

            for (&period, idxs) in &by_period {
                // Relocate the booking leg every `rotate_every_periods`.
                let rotate_every = (self.config.rotate_every_periods as i64).max(1);
                let leg_idx = ((period.max(0) / rotate_every) as usize) % booking_pool.len();
                let booking_leg = &booking_pool[leg_idx];

                let mut planted = 0u32;
                for &ei in idxs {
                    if planted >= self.config.per_period_count {
                        break;
                    }
                    if Self::restructure_entry(&mut entries[ei], booking_leg, &counterparty) {
                        let e = &entries[ei];
                        let mut label = LabeledAnomaly::new(
                            format!("{campaign_id}-P{period}-{}", e.document_number()),
                            // Routing funds through a pinned counterparty is round-tripping.
                            AnomalyType::Fraud(FraudType::RoundTripping),
                            e.document_number(),
                            "JournalEntry".to_string(),
                            e.header.company_code.clone(),
                            e.posting_date(),
                        )
                        .with_metadata("campaign_id", &campaign_id)
                        .with_metadata("campaign_period", &period.to_string())
                        .with_metadata("campaign_counterparty", &counterparty)
                        .with_metadata("campaign_booking_leg", booking_leg)
                        .with_related_entity(&counterparty)
                        .with_related_entity(booking_leg);
                        // A persistent counterparty campaign is the canonical memory-only family:
                        // the careful version is residual-faint and only carry-forward memory of the
                        // pinned counterparty catches it (FINDINGS §40-41).
                        label.observability = ObservabilityClass::MemoryOnly;
                        labels.push(label);
                        planted += 1;
                    }
                }
            }

            // Avoid two campaigns colliding on the same counterparty in tiny account universes.
            accounts.retain(|a| *a != counterparty);
            if accounts.len() < 2 {
                break;
            }
        }

        labels
    }

    /// Rewrite the first debit line to `booking_leg` and the first credit line to `counterparty`,
    /// preserving amounts (hence balance), and flag the entry as fraud. Returns `false` (entry
    /// untouched) if the entry lacks both a debit and a credit line.
    fn restructure_entry(entry: &mut JournalEntry, booking_leg: &str, counterparty: &str) -> bool {
        let debit_pos = entry.lines.iter().position(|l| l.is_debit());
        let credit_pos = entry.lines.iter().position(|l| !l.is_debit());
        let (Some(d), Some(c)) = (debit_pos, credit_pos) else {
            return false;
        };
        if d == c {
            return false;
        }
        entry.lines[d].gl_account = booking_leg.to_string();
        entry.lines[c].gl_account = counterparty.to_string();
        entry.header.is_fraud = true;
        true
    }
}

/// Convenience entry point used by the injector: plan campaigns if the config is active.
pub fn plan_campaigns<R: Rng>(
    config: &FraudCampaignConfig,
    entries: &mut [JournalEntry],
    rng: &mut R,
) -> Vec<LabeledAnomaly> {
    FraudCampaignPlanner::new(config).plan(entries, rng)
}

/// A prior-year confirmed-finding record (the synthetic carry-forward register, §40/§59). DataSynth
/// knows the planted truth, so it emits the register an audit team would carry forward: a partial
/// (`confirmation_rate`) and noisy (`false_positive_rate`) set of confirmed counterparties. The
/// memory arm consumes the `counterparty`s; `is_true_finding` is retained for evaluation only.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CarryForwardRecord {
    /// The counterparty (beneficiary account) confirmed in the prior period.
    pub counterparty: String,
    /// Ground-truth: whether this was an actual campaign counterparty (vs an auditor false positive).
    pub is_true_finding: bool,
}

/// Build the synthetic carry-forward register from this period's campaign labels: confirm a
/// `confirmation_rate` fraction of the true campaign counterparties, plus a `false_positive_rate`
/// (of the true-finding count) of legitimate accounts wrongly confirmed. Deterministic given `rng`.
/// Empty when `carry_forward` is disabled.
pub fn build_carry_forward_register<R: Rng + RngExt>(
    campaign_labels: &[LabeledAnomaly],
    cfg: &CarryForwardConfig,
    legit_accounts: &[String],
    rng: &mut R,
) -> Vec<CarryForwardRecord> {
    if !cfg.enabled {
        return Vec::new();
    }
    // distinct true campaign counterparties (from the label metadata the planner stamped)
    let mut true_cps: Vec<String> = campaign_labels
        .iter()
        .filter_map(|l| l.metadata.get("campaign_counterparty").cloned())
        .collect::<BTreeSet<_>>()
        .into_iter()
        .collect();
    true_cps.shuffle(rng);
    let n_true = true_cps.len();
    let n_conf = (cfg.confirmation_rate.clamp(0.0, 1.0) * n_true as f64).round() as usize;
    let mut register: Vec<CarryForwardRecord> = true_cps
        .into_iter()
        .take(n_conf)
        .map(|c| CarryForwardRecord {
            counterparty: c,
            is_true_finding: true,
        })
        .collect();

    // false positives: legitimate accounts (not true counterparties) wrongly confirmed.
    let true_set: BTreeSet<&str> = campaign_labels
        .iter()
        .filter_map(|l| l.metadata.get("campaign_counterparty").map(String::as_str))
        .collect();
    let mut pool: Vec<String> = legit_accounts
        .iter()
        .filter(|a| !true_set.contains(a.as_str()))
        .cloned()
        .collect();
    pool.shuffle(rng);
    let n_fp = (cfg.false_positive_rate.max(0.0) * n_true as f64).round() as usize;
    register.extend(pool.into_iter().take(n_fp).map(|c| CarryForwardRecord {
        counterparty: c,
        is_true_finding: false,
    }));
    register
}

#[cfg(test)]
mod tests {
    use super::*;
    use chrono::NaiveDate;
    use datasynth_core::models::{JournalEntry, JournalEntryLine};
    use rand::SeedableRng;
    use rand_chacha::ChaCha8Rng;
    use rust_decimal_macros::dec;

    fn je(doc: &str, date: NaiveDate, debit_acct: &str, credit_acct: &str) -> JournalEntry {
        let mut e = JournalEntry::new_simple(
            doc.to_string(),
            "1000".to_string(),
            date,
            "test".to_string(),
        );
        let did = e.header.document_id;
        e.add_line(JournalEntryLine::debit(
            did,
            1,
            debit_acct.to_string(),
            dec!(100.00),
        ));
        e.add_line(JournalEntryLine::credit(
            did,
            2,
            credit_acct.to_string(),
            dec!(100.00),
        ));
        e
    }

    /// 4 monthly periods, a 6-account universe → the counterparty is pinned across every period
    /// while the booking leg rotates; balance is preserved; labels carry campaign metadata.
    #[test]
    fn campaign_pins_counterparty_and_rotates_booking_leg() {
        let cfg = FraudCampaignConfig {
            enabled: true,
            count: 1,
            per_period_count: 1,
            booking_leg_pool: 4,
            rotate_every_periods: 1,
            period_days: 30,
            ..Default::default()
        };
        let mut entries: Vec<JournalEntry> = (0..8)
            .map(|i| {
                let month = 1 + (i / 2); // 2 entries per month, months 1..4
                let date = NaiveDate::from_ymd_opt(2026, month as u32, 5).unwrap();
                je(
                    &format!("JE{i:03}"),
                    date,
                    &format!("4{i:03}"),
                    &format!("5{i:03}"),
                )
            })
            .collect();

        let mut rng = ChaCha8Rng::seed_from_u64(7);
        let labels = plan_campaigns(&cfg, &mut entries, &mut rng);
        assert!(!labels.is_empty(), "campaign must plant labels");

        // Every campaign label shares one pinned counterparty across all periods.
        let cps: std::collections::BTreeSet<&String> = labels
            .iter()
            .map(|l| l.metadata.get("campaign_counterparty").unwrap())
            .collect();
        assert_eq!(cps.len(), 1, "counterparty must be pinned across periods");
        let counterparty = cps.into_iter().next().unwrap();

        // The booking leg relocates across periods (more than one distinct leg over 4 periods).
        let legs: std::collections::BTreeSet<&String> = labels
            .iter()
            .map(|l| l.metadata.get("campaign_booking_leg").unwrap())
            .collect();
        assert!(legs.len() >= 2, "booking leg must relocate across periods");

        // Restructured entries: the pinned counterparty is a credit leg, balance preserved, fraud-flagged.
        let fraud: Vec<&JournalEntry> = entries.iter().filter(|e| e.is_fraud()).collect();
        assert_eq!(fraud.len(), labels.len());
        for e in fraud {
            assert_eq!(e.total_debit(), e.total_credit(), "balance preserved");
            assert!(
                e.lines.iter().any(|l| &l.gl_account == counterparty),
                "the pinned counterparty appears on every campaign entry"
            );
        }

        // Labels are memory-only (the persistent-counterparty family).
        assert!(labels
            .iter()
            .all(|l| l.observability == ObservabilityClass::MemoryOnly));
    }

    #[test]
    fn disabled_or_inactive_config_is_a_noop() {
        let mut entries = vec![je(
            "JE1",
            NaiveDate::from_ymd_opt(2026, 1, 1).unwrap(),
            "4000",
            "5000",
        )];
        let mut rng = ChaCha8Rng::seed_from_u64(1);
        // Disabled.
        let off = FraudCampaignConfig::default();
        assert!(plan_campaigns(&off, &mut entries, &mut rng).is_empty());
        assert!(!entries[0].is_fraud());
        // Active flag but degenerate (period_days 0) → inactive → no-op.
        let degenerate = FraudCampaignConfig {
            enabled: true,
            period_days: 0,
            ..FraudCampaignConfig::default()
        };
        assert!(!degenerate.is_active());
        assert!(plan_campaigns(&degenerate, &mut entries, &mut rng).is_empty());
    }

    #[test]
    fn carry_forward_register_confirms_a_fraction_and_adds_false_positives() {
        let labels: Vec<LabeledAnomaly> = ["C1", "C2", "C3", "C4"]
            .iter()
            .map(|c| {
                LabeledAnomaly::new(
                    "a".to_string(),
                    AnomalyType::Fraud(FraudType::RoundTripping),
                    "JE".to_string(),
                    "JE".to_string(),
                    "1000".to_string(),
                    NaiveDate::from_ymd_opt(2026, 1, 1).unwrap(),
                )
                .with_metadata("campaign_counterparty", c)
            })
            .collect();
        let legit: Vec<String> = (0..50).map(|i| format!("L{i}")).collect();
        let mut rng = ChaCha8Rng::seed_from_u64(3);

        // disabled → empty register
        assert!(build_carry_forward_register(
            &labels,
            &CarryForwardConfig::default(),
            &legit,
            &mut rng
        )
        .is_empty());

        // confirmation 0.5 of 4 → 2 true; false-positive 0.25 of 4 → 1 legit wrongly confirmed
        let cfg = CarryForwardConfig {
            enabled: true,
            confirmation_rate: 0.5,
            false_positive_rate: 0.25,
        };
        let reg = build_carry_forward_register(&labels, &cfg, &legit, &mut rng);
        let n_true = reg.iter().filter(|r| r.is_true_finding).count();
        let n_fp = reg.iter().filter(|r| !r.is_true_finding).count();
        assert_eq!(n_true, 2, "round(0.5 * 4) true counterparties confirmed");
        assert_eq!(n_fp, 1, "round(0.25 * 4) false positives");
        assert!(reg
            .iter()
            .filter(|r| r.is_true_finding)
            .all(|r| ["C1", "C2", "C3", "C4"].contains(&r.counterparty.as_str())));
        assert!(reg
            .iter()
            .filter(|r| !r.is_true_finding)
            .all(|r| r.counterparty.starts_with('L')));
    }
}