use chrono::{Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EventType {
SalesOrder,
PurchaseOrder,
GoodsReceipt,
InvoiceReceipt,
InvoiceIssue,
Payment,
JournalEntry,
Accrual,
Depreciation,
Intercompany,
PeriodClose,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum LagDistributionType {
Fixed {
hours: f64,
},
Normal {
mu: f64,
sigma: f64,
},
LogNormal {
mu: f64,
sigma: f64,
},
Exponential {
lambda: f64,
},
}
impl Default for LagDistributionType {
fn default() -> Self {
Self::LogNormal {
mu: 0.5, sigma: 0.8,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LagDistribution {
#[serde(default)]
pub distribution: LagDistributionType,
#[serde(default)]
pub min_lag_hours: f64,
#[serde(default = "default_max_lag")]
pub max_lag_hours: f64,
}
fn default_max_lag() -> f64 {
72.0 }
impl Default for LagDistribution {
fn default() -> Self {
Self {
distribution: LagDistributionType::default(),
min_lag_hours: 0.0,
max_lag_hours: 72.0,
}
}
}
impl LagDistribution {
pub fn fixed(hours: f64) -> Self {
Self {
distribution: LagDistributionType::Fixed { hours },
min_lag_hours: hours,
max_lag_hours: hours,
}
}
pub fn log_normal(mu: f64, sigma: f64) -> Self {
Self {
distribution: LagDistributionType::LogNormal { mu, sigma },
min_lag_hours: 0.0,
max_lag_hours: 72.0,
}
}
pub fn normal(mu: f64, sigma: f64) -> Self {
Self {
distribution: LagDistributionType::Normal { mu, sigma },
min_lag_hours: 0.0,
max_lag_hours: 72.0,
}
}
pub fn sample(&self, rng: &mut ChaCha8Rng) -> f64 {
let raw = match &self.distribution {
LagDistributionType::Fixed { hours } => *hours,
LagDistributionType::Normal { mu, sigma } => {
let u1: f64 = rng.random();
let u2: f64 = rng.random();
let z = (-2.0 * u1.ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos();
mu + sigma * z
}
LagDistributionType::LogNormal { mu, sigma } => {
let u1: f64 = rng.random();
let u2: f64 = rng.random();
let z = (-2.0 * u1.ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos();
(mu + sigma * z).exp()
}
LagDistributionType::Exponential { lambda } => {
let u: f64 = rng.random();
-u.ln() / lambda
}
};
raw.clamp(self.min_lag_hours, self.max_lag_hours)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrossDayConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default)]
pub probability_by_hour: HashMap<u8, f64>,
#[serde(default = "default_work_start")]
pub work_start_hour: u8,
#[serde(default = "default_work_end")]
pub work_end_hour: u8,
#[serde(default = "default_cutoff")]
pub same_day_cutoff_hour: u8,
}
fn default_true() -> bool {
true
}
fn default_work_start() -> u8 {
8
}
fn default_work_end() -> u8 {
18
}
fn default_cutoff() -> u8 {
16
}
impl Default for CrossDayConfig {
fn default() -> Self {
let mut probability_by_hour = HashMap::new();
probability_by_hour.insert(17, 0.3);
probability_by_hour.insert(18, 0.6);
probability_by_hour.insert(19, 0.8);
probability_by_hour.insert(20, 0.9);
probability_by_hour.insert(21, 0.95);
probability_by_hour.insert(22, 0.99);
probability_by_hour.insert(23, 0.99);
Self {
enabled: true,
probability_by_hour,
work_start_hour: 8,
work_end_hour: 18,
same_day_cutoff_hour: 16,
}
}
}
impl CrossDayConfig {
pub fn next_day_probability(&self, hour: u8) -> f64 {
if !self.enabled {
return 0.0;
}
if let Some(&prob) = self.probability_by_hour.get(&hour) {
return prob;
}
if hour < self.same_day_cutoff_hour {
0.0 } else if hour < self.work_end_hour {
0.2 } else {
0.8 }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessingLagConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default)]
pub default_lag: LagDistribution,
#[serde(default)]
pub event_lags: HashMap<EventType, LagDistribution>,
#[serde(default)]
pub cross_day: CrossDayConfig,
}
impl Default for ProcessingLagConfig {
fn default() -> Self {
let mut event_lags = HashMap::new();
event_lags.insert(
EventType::SalesOrder,
LagDistribution::log_normal(0.5, 0.8), );
event_lags.insert(
EventType::PurchaseOrder,
LagDistribution::log_normal(0.7, 0.6), );
event_lags.insert(
EventType::GoodsReceipt,
LagDistribution::log_normal(1.0, 0.5), );
event_lags.insert(
EventType::InvoiceReceipt,
LagDistribution::log_normal(1.5, 0.6), );
event_lags.insert(
EventType::InvoiceIssue,
LagDistribution::log_normal(0.3, 0.5), );
event_lags.insert(
EventType::Payment,
LagDistribution::log_normal(0.8, 0.7), );
event_lags.insert(
EventType::JournalEntry,
LagDistribution::log_normal(0.0, 0.3), );
event_lags.insert(
EventType::Accrual,
LagDistribution::fixed(0.0), );
event_lags.insert(
EventType::Depreciation,
LagDistribution::fixed(0.0), );
event_lags.insert(
EventType::Intercompany,
LagDistribution::log_normal(2.0, 0.8), );
event_lags.insert(
EventType::PeriodClose,
LagDistribution::fixed(0.0), );
Self {
enabled: true,
default_lag: LagDistribution::log_normal(0.5, 0.8),
event_lags,
cross_day: CrossDayConfig::default(),
}
}
}
impl ProcessingLagConfig {
pub fn get_lag_distribution(&self, event_type: EventType) -> &LagDistribution {
self.event_lags
.get(&event_type)
.unwrap_or(&self.default_lag)
}
}
pub struct ProcessingLagCalculator {
config: ProcessingLagConfig,
rng: ChaCha8Rng,
}
impl ProcessingLagCalculator {
pub fn new(seed: u64) -> Self {
Self {
config: ProcessingLagConfig::default(),
rng: ChaCha8Rng::seed_from_u64(seed),
}
}
pub fn with_config(seed: u64, config: ProcessingLagConfig) -> Self {
Self {
config,
rng: ChaCha8Rng::seed_from_u64(seed),
}
}
pub fn calculate_posting_time(
&mut self,
event_type: EventType,
event_datetime: NaiveDateTime,
) -> NaiveDateTime {
if !self.config.enabled {
return event_datetime;
}
let lag_dist = self.config.get_lag_distribution(event_type);
let lag_hours = lag_dist.sample(&mut self.rng);
let lag_seconds = (lag_hours * 3600.0) as i64;
let mut posting_time = event_datetime + Duration::seconds(lag_seconds);
if self.should_post_next_day(event_datetime.hour() as u8) {
let next_day = event_datetime.date() + Duration::days(1);
let morning_hour = self.config.cross_day.work_start_hour as u32;
let morning_minute: u32 = self.rng.random_range(0..60);
posting_time = NaiveDateTime::new(
next_day,
NaiveTime::from_hms_opt(morning_hour, morning_minute, 0)
.expect("valid distribution params"),
);
}
if posting_time < event_datetime {
posting_time = event_datetime;
}
posting_time
}
pub fn should_post_next_day(&mut self, hour: u8) -> bool {
let prob = self.config.cross_day.next_day_probability(hour);
self.rng.random::<f64>() < prob
}
pub fn calculate_posting_date(
&mut self,
event_type: EventType,
event_date: NaiveDate,
event_hour: u8,
) -> NaiveDate {
let event_time =
NaiveTime::from_hms_opt(event_hour as u32, 0, 0).expect("valid distribution params");
let event_datetime = NaiveDateTime::new(event_date, event_time);
self.calculate_posting_time(event_type, event_datetime)
.date()
}
pub fn config(&self) -> &ProcessingLagConfig {
&self.config
}
pub fn reset(&mut self, seed: u64) {
self.rng = ChaCha8Rng::seed_from_u64(seed);
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ProcessingLagSchemaConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default)]
pub sales_order_lag: Option<LagSchemaConfig>,
#[serde(default)]
pub purchase_order_lag: Option<LagSchemaConfig>,
#[serde(default)]
pub goods_receipt_lag: Option<LagSchemaConfig>,
#[serde(default)]
pub invoice_receipt_lag: Option<LagSchemaConfig>,
#[serde(default)]
pub invoice_issue_lag: Option<LagSchemaConfig>,
#[serde(default)]
pub payment_lag: Option<LagSchemaConfig>,
#[serde(default)]
pub journal_entry_lag: Option<LagSchemaConfig>,
#[serde(default)]
pub cross_day_posting: Option<CrossDaySchemaConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LagSchemaConfig {
pub mu: f64,
pub sigma: f64,
#[serde(default)]
pub min_hours: Option<f64>,
#[serde(default)]
pub max_hours: Option<f64>,
}
impl LagSchemaConfig {
pub fn to_distribution(&self) -> LagDistribution {
LagDistribution {
distribution: LagDistributionType::LogNormal {
mu: self.mu,
sigma: self.sigma,
},
min_lag_hours: self.min_hours.unwrap_or(0.0),
max_lag_hours: self.max_hours.unwrap_or(72.0),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CrossDaySchemaConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default)]
pub probability_by_hour: HashMap<u8, f64>,
}
impl ProcessingLagSchemaConfig {
pub fn to_config(&self) -> ProcessingLagConfig {
let mut config = ProcessingLagConfig {
enabled: self.enabled,
..Default::default()
};
if let Some(lag) = &self.sales_order_lag {
config
.event_lags
.insert(EventType::SalesOrder, lag.to_distribution());
}
if let Some(lag) = &self.purchase_order_lag {
config
.event_lags
.insert(EventType::PurchaseOrder, lag.to_distribution());
}
if let Some(lag) = &self.goods_receipt_lag {
config
.event_lags
.insert(EventType::GoodsReceipt, lag.to_distribution());
}
if let Some(lag) = &self.invoice_receipt_lag {
config
.event_lags
.insert(EventType::InvoiceReceipt, lag.to_distribution());
}
if let Some(lag) = &self.invoice_issue_lag {
config
.event_lags
.insert(EventType::InvoiceIssue, lag.to_distribution());
}
if let Some(lag) = &self.payment_lag {
config
.event_lags
.insert(EventType::Payment, lag.to_distribution());
}
if let Some(lag) = &self.journal_entry_lag {
config
.event_lags
.insert(EventType::JournalEntry, lag.to_distribution());
}
if let Some(cross_day) = &self.cross_day_posting {
config.cross_day.enabled = cross_day.enabled;
if !cross_day.probability_by_hour.is_empty() {
config.cross_day.probability_by_hour = cross_day.probability_by_hour.clone();
}
}
config
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_fixed_lag() {
let lag = LagDistribution::fixed(2.0);
let mut rng = ChaCha8Rng::seed_from_u64(42);
for _ in 0..10 {
assert!((lag.sample(&mut rng) - 2.0).abs() < 0.01);
}
}
#[test]
fn test_log_normal_lag() {
let lag = LagDistribution::log_normal(0.5, 0.5);
let mut rng = ChaCha8Rng::seed_from_u64(42);
let mut samples: Vec<f64> = (0..1000).map(|_| lag.sample(&mut rng)).collect();
samples.sort_by(|a, b| a.partial_cmp(b).unwrap());
let median = samples[500];
assert!(median > 1.0 && median < 3.0);
assert!(samples.iter().all(|&x| (0.0..=72.0).contains(&x)));
}
#[test]
fn test_cross_day_probability() {
let config = CrossDayConfig::default();
assert!(config.next_day_probability(8) < 0.1);
assert!(config.next_day_probability(14) < 0.1);
assert!(config.next_day_probability(17) > 0.2);
assert!(config.next_day_probability(19) > 0.7);
assert!(config.next_day_probability(22) > 0.9);
}
#[test]
fn test_processing_lag_calculator() {
let mut calc = ProcessingLagCalculator::new(42);
let event_time = NaiveDateTime::new(
NaiveDate::from_ymd_opt(2024, 6, 15).unwrap(),
NaiveTime::from_hms_opt(10, 0, 0).unwrap(),
);
let posting_time = calc.calculate_posting_time(EventType::SalesOrder, event_time);
assert!(posting_time >= event_time);
let hours_diff = (posting_time - event_time).num_hours();
assert!(hours_diff < 24);
}
#[test]
fn test_late_event_cross_day() {
let mut config = ProcessingLagConfig::default();
config.cross_day.probability_by_hour.insert(22, 1.0);
let mut calc = ProcessingLagCalculator::with_config(42, config);
let event_time = NaiveDateTime::new(
NaiveDate::from_ymd_opt(2024, 6, 15).unwrap(),
NaiveTime::from_hms_opt(22, 0, 0).unwrap(),
);
let posting_time = calc.calculate_posting_time(EventType::SalesOrder, event_time);
assert!(posting_time.date() > event_time.date());
}
#[test]
fn test_event_specific_lags() {
let config = ProcessingLagConfig::default();
let accrual_lag = config.get_lag_distribution(EventType::Accrual);
if let LagDistributionType::Fixed { hours } = accrual_lag.distribution {
assert!((hours - 0.0).abs() < 0.01);
} else {
panic!("Accrual should have fixed lag");
}
let invoice_lag = config.get_lag_distribution(EventType::InvoiceReceipt);
let sales_lag = config.get_lag_distribution(EventType::SalesOrder);
if let (
LagDistributionType::LogNormal { mu: inv_mu, .. },
LagDistributionType::LogNormal { mu: sales_mu, .. },
) = (&invoice_lag.distribution, &sales_lag.distribution)
{
assert!(inv_mu > sales_mu);
}
}
#[test]
fn test_schema_config_conversion() {
let schema = ProcessingLagSchemaConfig {
enabled: true,
sales_order_lag: Some(LagSchemaConfig {
mu: 1.0,
sigma: 0.5,
min_hours: Some(0.5),
max_hours: Some(24.0),
}),
cross_day_posting: Some(CrossDaySchemaConfig {
enabled: true,
probability_by_hour: {
let mut m = HashMap::new();
m.insert(18, 0.5);
m
},
}),
..Default::default()
};
let config = schema.to_config();
let sales_lag = config.get_lag_distribution(EventType::SalesOrder);
assert!((sales_lag.min_lag_hours - 0.5).abs() < 0.01);
assert!((sales_lag.max_lag_hours - 24.0).abs() < 0.01);
assert_eq!(config.cross_day.probability_by_hour.get(&18), Some(&0.5));
}
#[test]
fn test_calculate_posting_date() {
let mut calc = ProcessingLagCalculator::new(42);
let event_date = NaiveDate::from_ymd_opt(2024, 6, 15).unwrap();
let posting_date = calc.calculate_posting_date(EventType::JournalEntry, event_date, 10);
let days_diff = (posting_date - event_date).num_days();
assert!(days_diff <= 1);
}
}