use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::Arc;
use datasynth_core::error::{SynthError, SynthResult};
use datasynth_core::models::subledger::ar::{
DunningItem, DunningLetter, DunningRun, OnAccountPayment, PaymentCorrection, ShortPayment,
};
use datasynth_core::models::JournalEntry;
use datasynth_core::traits::Sink;
use datasynth_core::{DiskSpaceGuard, DiskSpaceGuardConfig};
pub struct CsvSink {
writer: BufWriter<File>,
items_written: u64,
bytes_written: u64,
header_written: bool,
disk_guard: Option<Arc<DiskSpaceGuard>>,
check_interval: u64,
}
impl CsvSink {
pub fn new(path: PathBuf) -> SynthResult<Self> {
let file = File::create(&path)?;
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
bytes_written: 0,
header_written: false,
disk_guard: None,
check_interval: 500,
})
}
pub fn with_disk_guard(path: PathBuf, min_free_mb: usize) -> SynthResult<Self> {
let file = File::create(&path)?;
let disk_config = DiskSpaceGuardConfig::with_min_free_mb(min_free_mb).with_path(&path);
let disk_guard = Arc::new(DiskSpaceGuard::new(disk_config));
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
bytes_written: 0,
header_written: false,
disk_guard: Some(disk_guard),
check_interval: 500,
})
}
pub fn set_disk_guard(&mut self, guard: Arc<DiskSpaceGuard>) {
self.disk_guard = Some(guard);
}
pub fn set_check_interval(&mut self, interval: u64) {
self.check_interval = interval;
}
fn check_disk_space(&self) -> SynthResult<()> {
if let Some(guard) = &self.disk_guard {
if self.items_written.is_multiple_of(self.check_interval) {
guard
.check()
.map_err(|e| SynthError::disk_exhausted(e.available_mb, e.required_mb))?;
}
}
Ok(())
}
fn record_write(&self, bytes: u64) {
if let Some(guard) = &self.disk_guard {
guard.record_write(bytes);
}
}
fn write_header(&mut self) -> SynthResult<()> {
if self.header_written {
return Ok(());
}
let header = "document_id,company_code,fiscal_year,fiscal_period,posting_date,\
document_type,currency,source,line_number,gl_account,debit_amount,credit_amount,\
trading_partner,fraud_type,anomaly_type\n";
let bytes = header.as_bytes();
self.writer.write_all(bytes)?;
self.bytes_written += bytes.len() as u64;
self.record_write(bytes.len() as u64);
self.header_written = true;
Ok(())
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
}
impl Sink for CsvSink {
type Item = JournalEntry;
fn write(&mut self, item: Self::Item) -> SynthResult<()> {
self.check_disk_space()?;
self.write_header()?;
let source_label: std::borrow::Cow<str> = match &item.header.sap_source_code {
Some(code) => std::borrow::Cow::Borrowed(code.as_str()),
None => std::borrow::Cow::Owned(format!("{:?}", item.header.source)),
};
let fraud_type_str = item
.header
.fraud_type
.map(|ft| format!("{ft:?}"))
.unwrap_or_default();
let anomaly_type_str = item
.header
.anomaly_type
.as_deref()
.unwrap_or("")
.to_string();
for line in &item.lines {
let bytes_before = self.bytes_written;
writeln!(
self.writer,
"{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
item.header.document_id,
item.header.company_code,
item.header.fiscal_year,
item.header.fiscal_period,
item.header.posting_date,
item.header.document_type,
item.header.currency,
source_label,
line.line_number,
line.gl_account,
line.debit_amount,
line.credit_amount,
line.trading_partner.as_deref().unwrap_or(""),
fraud_type_str,
anomaly_type_str,
)?;
let estimated_bytes = 100u64; self.bytes_written += estimated_bytes;
self.record_write(self.bytes_written - bytes_before);
}
self.items_written += 1;
Ok(())
}
fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
fn close(mut self) -> SynthResult<()> {
self.flush()?;
Ok(())
}
fn items_written(&self) -> u64 {
self.items_written
}
}
pub struct DunningRunCsvSink {
writer: BufWriter<File>,
items_written: u64,
header_written: bool,
}
impl DunningRunCsvSink {
pub fn new(path: PathBuf) -> SynthResult<Self> {
let file = File::create(&path)?;
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
header_written: false,
})
}
fn write_header(&mut self) -> SynthResult<()> {
if self.header_written {
return Ok(());
}
let header = "run_id,company_code,run_date,dunning_date,customers_evaluated,\
customers_with_letters,letters_generated,total_amount_dunned,\
total_dunning_charges,total_interest_amount,status,started_at,completed_at\n";
self.writer.write_all(header.as_bytes())?;
self.header_written = true;
Ok(())
}
}
impl Sink for DunningRunCsvSink {
type Item = DunningRun;
fn write(&mut self, item: Self::Item) -> SynthResult<()> {
self.write_header()?;
writeln!(
self.writer,
"{},{},{},{},{},{},{},{},{},{},{:?},{},{}",
item.run_id,
item.company_code,
item.run_date,
item.dunning_date,
item.customers_evaluated,
item.customers_with_letters,
item.letters_generated,
item.total_amount_dunned,
item.total_dunning_charges,
item.total_interest_amount,
item.status,
item.started_at,
item.completed_at.map(|d| d.to_string()).unwrap_or_default(),
)?;
self.items_written += 1;
Ok(())
}
fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
fn close(mut self) -> SynthResult<()> {
self.flush()?;
Ok(())
}
fn items_written(&self) -> u64 {
self.items_written
}
}
pub struct DunningLetterCsvSink {
writer: BufWriter<File>,
items_written: u64,
header_written: bool,
}
impl DunningLetterCsvSink {
pub fn new(path: PathBuf) -> SynthResult<Self> {
let file = File::create(&path)?;
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
header_written: false,
})
}
fn write_header(&mut self) -> SynthResult<()> {
if self.header_written {
return Ok(());
}
let header = "letter_id,dunning_run_id,company_code,customer_id,customer_name,\
dunning_level,dunning_date,total_dunned_amount,dunning_charges,\
interest_amount,total_amount_due,currency,payment_deadline,\
is_sent,sent_date,response_type,status\n";
self.writer.write_all(header.as_bytes())?;
self.header_written = true;
Ok(())
}
}
impl Sink for DunningLetterCsvSink {
type Item = DunningLetter;
fn write(&mut self, item: Self::Item) -> SynthResult<()> {
self.write_header()?;
writeln!(
self.writer,
"{},{},{},{},\"{}\",{},{},{},{},{},{},{},{},{},{},{:?},{:?}",
item.letter_id,
item.dunning_run_id,
item.company_code,
item.customer_id,
item.customer_name.replace('"', "\"\""),
item.dunning_level,
item.dunning_date,
item.total_dunned_amount,
item.dunning_charges,
item.interest_amount,
item.total_amount_due,
item.currency,
item.payment_deadline,
item.is_sent,
item.sent_date.map(|d| d.to_string()).unwrap_or_default(),
item.response_type,
item.status,
)?;
self.items_written += 1;
Ok(())
}
fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
fn close(mut self) -> SynthResult<()> {
self.flush()?;
Ok(())
}
fn items_written(&self) -> u64 {
self.items_written
}
}
pub struct DunningItemCsvSink {
writer: BufWriter<File>,
items_written: u64,
header_written: bool,
}
impl DunningItemCsvSink {
pub fn new(path: PathBuf) -> SynthResult<Self> {
let file = File::create(&path)?;
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
header_written: false,
})
}
fn write_header(&mut self) -> SynthResult<()> {
if self.header_written {
return Ok(());
}
let header = "letter_id,invoice_number,invoice_date,due_date,original_amount,\
open_amount,days_overdue,interest_amount,previous_dunning_level,\
new_dunning_level,is_blocked,block_reason\n";
self.writer.write_all(header.as_bytes())?;
self.header_written = true;
Ok(())
}
pub fn write_with_letter_id(&mut self, letter_id: &str, item: &DunningItem) -> SynthResult<()> {
self.write_header()?;
writeln!(
self.writer,
"{},{},{},{},{},{},{},{},{},{},{},{}",
letter_id,
item.invoice_number,
item.invoice_date,
item.due_date,
item.original_amount,
item.open_amount,
item.days_overdue,
item.interest_amount,
item.previous_dunning_level,
item.new_dunning_level,
item.is_blocked,
item.block_reason.as_deref().unwrap_or(""),
)?;
self.items_written += 1;
Ok(())
}
pub fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
}
pub struct PaymentCorrectionCsvSink {
writer: BufWriter<File>,
items_written: u64,
header_written: bool,
}
impl PaymentCorrectionCsvSink {
pub fn new(path: PathBuf) -> SynthResult<Self> {
let file = File::create(&path)?;
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
header_written: false,
})
}
fn write_header(&mut self) -> SynthResult<()> {
if self.header_written {
return Ok(());
}
let header = "correction_id,company_code,customer_id,original_payment_id,\
correction_type,original_amount,correction_amount,currency,\
correction_date,reversal_je_id,correcting_payment_id,status,\
reason,bank_reference,chargeback_code,fee_amount\n";
self.writer.write_all(header.as_bytes())?;
self.header_written = true;
Ok(())
}
}
impl Sink for PaymentCorrectionCsvSink {
type Item = PaymentCorrection;
fn write(&mut self, item: Self::Item) -> SynthResult<()> {
self.write_header()?;
writeln!(
self.writer,
"{},{},{},{},{:?},{},{},{},{},{},{},{:?},\"{}\",{},{},{}",
item.correction_id,
item.company_code,
item.customer_id,
item.original_payment_id,
item.correction_type,
item.original_amount,
item.correction_amount,
item.currency,
item.correction_date,
item.reversal_je_id.as_deref().unwrap_or(""),
item.correcting_payment_id.as_deref().unwrap_or(""),
item.status,
item.reason.as_deref().unwrap_or("").replace('"', "\"\""),
item.bank_reference.as_deref().unwrap_or(""),
item.chargeback_code.as_deref().unwrap_or(""),
item.fee_amount,
)?;
self.items_written += 1;
Ok(())
}
fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
fn close(mut self) -> SynthResult<()> {
self.flush()?;
Ok(())
}
fn items_written(&self) -> u64 {
self.items_written
}
}
pub struct ShortPaymentCsvSink {
writer: BufWriter<File>,
items_written: u64,
header_written: bool,
}
impl ShortPaymentCsvSink {
pub fn new(path: PathBuf) -> SynthResult<Self> {
let file = File::create(&path)?;
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
header_written: false,
})
}
fn write_header(&mut self) -> SynthResult<()> {
if self.header_written {
return Ok(());
}
let header = "short_payment_id,company_code,customer_id,payment_id,invoice_id,\
expected_amount,paid_amount,short_amount,currency,payment_date,\
reason_code,reason_description,disposition,credit_memo_id,\
write_off_je_id,rebill_invoice_id\n";
self.writer.write_all(header.as_bytes())?;
self.header_written = true;
Ok(())
}
}
impl Sink for ShortPaymentCsvSink {
type Item = ShortPayment;
fn write(&mut self, item: Self::Item) -> SynthResult<()> {
self.write_header()?;
writeln!(
self.writer,
"{},{},{},{},{},{},{},{},{},{},{:?},\"{}\",{:?},{},{},{}",
item.short_payment_id,
item.company_code,
item.customer_id,
item.payment_id,
item.invoice_id,
item.expected_amount,
item.paid_amount,
item.short_amount,
item.currency,
item.payment_date,
item.reason_code,
item.reason_description
.as_deref()
.unwrap_or("")
.replace('"', "\"\""),
item.disposition,
item.credit_memo_id.as_deref().unwrap_or(""),
item.write_off_je_id.as_deref().unwrap_or(""),
item.rebill_invoice_id.as_deref().unwrap_or(""),
)?;
self.items_written += 1;
Ok(())
}
fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
fn close(mut self) -> SynthResult<()> {
self.flush()?;
Ok(())
}
fn items_written(&self) -> u64 {
self.items_written
}
}
pub struct OnAccountPaymentCsvSink {
writer: BufWriter<File>,
items_written: u64,
header_written: bool,
}
impl OnAccountPaymentCsvSink {
pub fn new(path: PathBuf) -> SynthResult<Self> {
let file = File::create(&path)?;
Ok(Self {
writer: BufWriter::with_capacity(256 * 1024, file),
items_written: 0,
header_written: false,
})
}
fn write_header(&mut self) -> SynthResult<()> {
if self.header_written {
return Ok(());
}
let header = "on_account_id,company_code,customer_id,payment_id,amount,\
remaining_amount,currency,received_date,status,reason,\
applications_count,notes\n";
self.writer.write_all(header.as_bytes())?;
self.header_written = true;
Ok(())
}
}
impl Sink for OnAccountPaymentCsvSink {
type Item = OnAccountPayment;
fn write(&mut self, item: Self::Item) -> SynthResult<()> {
self.write_header()?;
writeln!(
self.writer,
"{},{},{},{},{},{},{},{},{:?},{:?},{},\"{}\"",
item.on_account_id,
item.company_code,
item.customer_id,
item.payment_id,
item.amount,
item.remaining_amount,
item.currency,
item.received_date,
item.status,
item.reason,
item.applications.len(),
item.notes.as_deref().unwrap_or("").replace('"', "\"\""),
)?;
self.items_written += 1;
Ok(())
}
fn flush(&mut self) -> SynthResult<()> {
self.writer.flush()?;
Ok(())
}
fn close(mut self) -> SynthResult<()> {
self.flush()?;
Ok(())
}
fn items_written(&self) -> u64 {
self.items_written
}
}
#[cfg(test)]
mod tests {
use datasynth_core::models::{JournalEntry, JournalEntryHeader, JournalEntryLine};
use datasynth_core::traits::Sink;
use rust_decimal_macros::dec;
fn make_je_with_trading_partner() -> JournalEntry {
use chrono::NaiveDate;
let posting_date = NaiveDate::from_ymd_opt(2024, 1, 15).unwrap();
let header = JournalEntryHeader::new("US10".to_string(), posting_date);
let mut je = JournalEntry::new(header);
let mut line1 =
JournalEntryLine::debit(je.header.document_id, 1, "1000".to_string(), dec!(500.00));
line1.trading_partner = Some("VENDOR_A".to_string());
let line2 =
JournalEntryLine::credit(je.header.document_id, 2, "2000".to_string(), dec!(500.00));
je.lines.push(line1);
je.lines.push(line2);
je
}
fn make_je_with_fraud_type() -> JournalEntry {
use chrono::NaiveDate;
use datasynth_core::models::FraudType;
let posting_date = NaiveDate::from_ymd_opt(2024, 6, 1).unwrap();
let mut header = JournalEntryHeader::new("FR10".to_string(), posting_date);
header.is_fraud = true;
header.fraud_type = Some(FraudType::DuplicatePayment);
let mut je = JournalEntry::new(header);
let line1 =
JournalEntryLine::debit(je.header.document_id, 1, "2000".to_string(), dec!(200.00));
let line2 =
JournalEntryLine::credit(je.header.document_id, 2, "1000".to_string(), dec!(200.00));
je.lines.push(line1);
je.lines.push(line2);
je
}
#[test]
fn csv_writer_includes_trading_partner_column() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut sink = super::CsvSink::new(path.clone()).unwrap();
sink.write(make_je_with_trading_partner()).unwrap();
sink.flush().unwrap();
let contents = std::fs::read_to_string(&path).unwrap();
let mut lines = contents.lines();
let header = lines.next().expect("no header line");
assert!(
header.contains("trading_partner"),
"header missing trading_partner column; got: {header}"
);
assert!(
header.contains("fraud_type"),
"header missing fraud_type column; got: {header}"
);
assert!(
header.contains("anomaly_type"),
"header missing anomaly_type column; got: {header}"
);
let row1 = lines.next().expect("no first data row");
assert!(
row1.contains(",VENDOR_A,"),
"expected row1 to contain ',VENDOR_A,'; got: {row1}"
);
assert!(
row1.ends_with("VENDOR_A,,"),
"expected row1 to end with 'VENDOR_A,,'; got: {row1}"
);
let row2 = lines.next().expect("no second data row");
assert!(
row2.ends_with(",,"),
"expected row2 to end with ',,' (empty trading_partner, fraud_type, anomaly_type); got: {row2}"
);
}
#[test]
fn csv_sink_fraud_type_column_populated() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut sink = super::CsvSink::new(path.clone()).unwrap();
sink.write(make_je_with_fraud_type()).unwrap();
sink.flush().unwrap();
let contents = std::fs::read_to_string(&path).unwrap();
let mut lines = contents.lines();
let _header = lines.next().expect("no header");
let row1 = lines.next().expect("no data row");
assert!(
row1.contains("DuplicatePayment"),
"expected 'DuplicatePayment' in fraud_type column; got: {row1}"
);
}
#[test]
fn csv_sink_fraud_type_none_is_empty() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut sink = super::CsvSink::new(path.clone()).unwrap();
sink.write(make_je_with_trading_partner()).unwrap();
sink.flush().unwrap();
let contents = std::fs::read_to_string(&path).unwrap();
let mut lines = contents.lines();
let _header = lines.next().expect("no header");
let row1 = lines.next().expect("no data row");
let cols: Vec<&str> = row1.split(',').collect();
assert_eq!(
cols.len(),
15,
"expected 15 columns in CsvSink row, got {}; row: {row1}",
cols.len()
);
assert!(
cols[13].is_empty(),
"expected empty fraud_type (col 14); got: {}",
cols[13]
);
assert!(
cols[14].is_empty(),
"expected empty anomaly_type (col 15); got: {}",
cols[14]
);
}
}