1use chrono::{Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
7use rand::prelude::*;
8use rand_chacha::ChaCha8Rng;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum EventType {
16 SalesOrder,
18 PurchaseOrder,
20 GoodsReceipt,
22 InvoiceReceipt,
24 InvoiceIssue,
26 Payment,
28 JournalEntry,
30 Accrual,
32 Depreciation,
34 Intercompany,
36 PeriodClose,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "snake_case")]
43pub enum LagDistributionType {
44 Fixed {
46 hours: f64,
48 },
49 Normal {
51 mu: f64,
53 sigma: f64,
55 },
56 LogNormal {
58 mu: f64,
60 sigma: f64,
62 },
63 Exponential {
65 lambda: f64,
67 },
68}
69
70impl Default for LagDistributionType {
71 fn default() -> Self {
72 Self::LogNormal {
73 mu: 0.5, sigma: 0.8,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct LagDistribution {
82 #[serde(default)]
84 pub distribution: LagDistributionType,
85 #[serde(default)]
87 pub min_lag_hours: f64,
88 #[serde(default = "default_max_lag")]
90 pub max_lag_hours: f64,
91}
92
93fn default_max_lag() -> f64 {
94 72.0 }
96
97impl Default for LagDistribution {
98 fn default() -> Self {
99 Self {
100 distribution: LagDistributionType::default(),
101 min_lag_hours: 0.0,
102 max_lag_hours: 72.0,
103 }
104 }
105}
106
107impl LagDistribution {
108 pub fn fixed(hours: f64) -> Self {
110 Self {
111 distribution: LagDistributionType::Fixed { hours },
112 min_lag_hours: hours,
113 max_lag_hours: hours,
114 }
115 }
116
117 pub fn log_normal(mu: f64, sigma: f64) -> Self {
119 Self {
120 distribution: LagDistributionType::LogNormal { mu, sigma },
121 min_lag_hours: 0.0,
122 max_lag_hours: 72.0,
123 }
124 }
125
126 pub fn normal(mu: f64, sigma: f64) -> Self {
128 Self {
129 distribution: LagDistributionType::Normal { mu, sigma },
130 min_lag_hours: 0.0,
131 max_lag_hours: 72.0,
132 }
133 }
134
135 pub fn sample(&self, rng: &mut ChaCha8Rng) -> f64 {
137 let raw = match &self.distribution {
138 LagDistributionType::Fixed { hours } => *hours,
139 LagDistributionType::Normal { mu, sigma } => {
140 let u1: f64 = rng.gen();
142 let u2: f64 = rng.gen();
143 let z = (-2.0 * u1.ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos();
144 mu + sigma * z
145 }
146 LagDistributionType::LogNormal { mu, sigma } => {
147 let u1: f64 = rng.gen();
149 let u2: f64 = rng.gen();
150 let z = (-2.0 * u1.ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos();
151 (mu + sigma * z).exp()
152 }
153 LagDistributionType::Exponential { lambda } => {
154 let u: f64 = rng.gen();
155 -u.ln() / lambda
156 }
157 };
158
159 raw.clamp(self.min_lag_hours, self.max_lag_hours)
160 }
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct CrossDayConfig {
166 #[serde(default = "default_true")]
168 pub enabled: bool,
169 #[serde(default)]
172 pub probability_by_hour: HashMap<u8, f64>,
173 #[serde(default = "default_work_start")]
175 pub work_start_hour: u8,
176 #[serde(default = "default_work_end")]
178 pub work_end_hour: u8,
179 #[serde(default = "default_cutoff")]
181 pub same_day_cutoff_hour: u8,
182}
183
184fn default_true() -> bool {
185 true
186}
187
188fn default_work_start() -> u8 {
189 8
190}
191
192fn default_work_end() -> u8 {
193 18
194}
195
196fn default_cutoff() -> u8 {
197 16
198}
199
200impl Default for CrossDayConfig {
201 fn default() -> Self {
202 let mut probability_by_hour = HashMap::new();
203 probability_by_hour.insert(17, 0.3);
205 probability_by_hour.insert(18, 0.6);
206 probability_by_hour.insert(19, 0.8);
207 probability_by_hour.insert(20, 0.9);
208 probability_by_hour.insert(21, 0.95);
209 probability_by_hour.insert(22, 0.99);
210 probability_by_hour.insert(23, 0.99);
211
212 Self {
213 enabled: true,
214 probability_by_hour,
215 work_start_hour: 8,
216 work_end_hour: 18,
217 same_day_cutoff_hour: 16,
218 }
219 }
220}
221
222impl CrossDayConfig {
223 pub fn next_day_probability(&self, hour: u8) -> f64 {
225 if !self.enabled {
226 return 0.0;
227 }
228
229 if let Some(&prob) = self.probability_by_hour.get(&hour) {
230 return prob;
231 }
232
233 if hour < self.same_day_cutoff_hour {
235 0.0 } else if hour < self.work_end_hour {
237 0.2 } else {
239 0.8 }
241 }
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
246pub struct ProcessingLagConfig {
247 #[serde(default = "default_true")]
249 pub enabled: bool,
250
251 #[serde(default)]
253 pub default_lag: LagDistribution,
254
255 #[serde(default)]
257 pub event_lags: HashMap<EventType, LagDistribution>,
258
259 #[serde(default)]
261 pub cross_day: CrossDayConfig,
262}
263
264impl Default for ProcessingLagConfig {
265 fn default() -> Self {
266 let mut event_lags = HashMap::new();
267
268 event_lags.insert(
270 EventType::SalesOrder,
271 LagDistribution::log_normal(0.5, 0.8), );
273 event_lags.insert(
274 EventType::PurchaseOrder,
275 LagDistribution::log_normal(0.7, 0.6), );
277 event_lags.insert(
278 EventType::GoodsReceipt,
279 LagDistribution::log_normal(1.0, 0.5), );
281 event_lags.insert(
282 EventType::InvoiceReceipt,
283 LagDistribution::log_normal(1.5, 0.6), );
285 event_lags.insert(
286 EventType::InvoiceIssue,
287 LagDistribution::log_normal(0.3, 0.5), );
289 event_lags.insert(
290 EventType::Payment,
291 LagDistribution::log_normal(0.8, 0.7), );
293 event_lags.insert(
294 EventType::JournalEntry,
295 LagDistribution::log_normal(0.0, 0.3), );
297 event_lags.insert(
298 EventType::Accrual,
299 LagDistribution::fixed(0.0), );
301 event_lags.insert(
302 EventType::Depreciation,
303 LagDistribution::fixed(0.0), );
305 event_lags.insert(
306 EventType::Intercompany,
307 LagDistribution::log_normal(2.0, 0.8), );
309 event_lags.insert(
310 EventType::PeriodClose,
311 LagDistribution::fixed(0.0), );
313
314 Self {
315 enabled: true,
316 default_lag: LagDistribution::log_normal(0.5, 0.8),
317 event_lags,
318 cross_day: CrossDayConfig::default(),
319 }
320 }
321}
322
323impl ProcessingLagConfig {
324 pub fn get_lag_distribution(&self, event_type: EventType) -> &LagDistribution {
326 self.event_lags
327 .get(&event_type)
328 .unwrap_or(&self.default_lag)
329 }
330}
331
332pub struct ProcessingLagCalculator {
334 config: ProcessingLagConfig,
335 rng: ChaCha8Rng,
336}
337
338impl ProcessingLagCalculator {
339 pub fn new(seed: u64) -> Self {
341 Self {
342 config: ProcessingLagConfig::default(),
343 rng: ChaCha8Rng::seed_from_u64(seed),
344 }
345 }
346
347 pub fn with_config(seed: u64, config: ProcessingLagConfig) -> Self {
349 Self {
350 config,
351 rng: ChaCha8Rng::seed_from_u64(seed),
352 }
353 }
354
355 pub fn calculate_posting_time(
360 &mut self,
361 event_type: EventType,
362 event_datetime: NaiveDateTime,
363 ) -> NaiveDateTime {
364 if !self.config.enabled {
365 return event_datetime;
366 }
367
368 let lag_dist = self.config.get_lag_distribution(event_type);
370 let lag_hours = lag_dist.sample(&mut self.rng);
371
372 let lag_seconds = (lag_hours * 3600.0) as i64;
374 let mut posting_time = event_datetime + Duration::seconds(lag_seconds);
375
376 if self.should_post_next_day(event_datetime.hour() as u8) {
378 let next_day = event_datetime.date() + Duration::days(1);
380 let morning_hour = self.config.cross_day.work_start_hour as u32;
381 let morning_minute: u32 = self.rng.gen_range(0..60);
382 posting_time = NaiveDateTime::new(
383 next_day,
384 NaiveTime::from_hms_opt(morning_hour, morning_minute, 0)
385 .expect("valid distribution params"),
386 );
387 }
388
389 if posting_time < event_datetime {
391 posting_time = event_datetime;
392 }
393
394 posting_time
395 }
396
397 pub fn should_post_next_day(&mut self, hour: u8) -> bool {
399 let prob = self.config.cross_day.next_day_probability(hour);
400 self.rng.gen::<f64>() < prob
401 }
402
403 pub fn calculate_posting_date(
405 &mut self,
406 event_type: EventType,
407 event_date: NaiveDate,
408 event_hour: u8,
409 ) -> NaiveDate {
410 let event_time =
411 NaiveTime::from_hms_opt(event_hour as u32, 0, 0).expect("valid distribution params");
412 let event_datetime = NaiveDateTime::new(event_date, event_time);
413 self.calculate_posting_time(event_type, event_datetime)
414 .date()
415 }
416
417 pub fn config(&self) -> &ProcessingLagConfig {
419 &self.config
420 }
421
422 pub fn reset(&mut self, seed: u64) {
424 self.rng = ChaCha8Rng::seed_from_u64(seed);
425 }
426}
427
428#[derive(Debug, Clone, Default, Serialize, Deserialize)]
430pub struct ProcessingLagSchemaConfig {
431 #[serde(default = "default_true")]
433 pub enabled: bool,
434
435 #[serde(default)]
437 pub sales_order_lag: Option<LagSchemaConfig>,
438
439 #[serde(default)]
441 pub purchase_order_lag: Option<LagSchemaConfig>,
442
443 #[serde(default)]
445 pub goods_receipt_lag: Option<LagSchemaConfig>,
446
447 #[serde(default)]
449 pub invoice_receipt_lag: Option<LagSchemaConfig>,
450
451 #[serde(default)]
453 pub invoice_issue_lag: Option<LagSchemaConfig>,
454
455 #[serde(default)]
457 pub payment_lag: Option<LagSchemaConfig>,
458
459 #[serde(default)]
461 pub journal_entry_lag: Option<LagSchemaConfig>,
462
463 #[serde(default)]
465 pub cross_day_posting: Option<CrossDaySchemaConfig>,
466}
467
468#[derive(Debug, Clone, Serialize, Deserialize)]
470pub struct LagSchemaConfig {
471 pub mu: f64,
473 pub sigma: f64,
475 #[serde(default)]
477 pub min_hours: Option<f64>,
478 #[serde(default)]
480 pub max_hours: Option<f64>,
481}
482
483impl LagSchemaConfig {
484 pub fn to_distribution(&self) -> LagDistribution {
486 LagDistribution {
487 distribution: LagDistributionType::LogNormal {
488 mu: self.mu,
489 sigma: self.sigma,
490 },
491 min_lag_hours: self.min_hours.unwrap_or(0.0),
492 max_lag_hours: self.max_hours.unwrap_or(72.0),
493 }
494 }
495}
496
497#[derive(Debug, Clone, Default, Serialize, Deserialize)]
499pub struct CrossDaySchemaConfig {
500 #[serde(default = "default_true")]
502 pub enabled: bool,
503
504 #[serde(default)]
506 pub probability_by_hour: HashMap<u8, f64>,
507}
508
509impl ProcessingLagSchemaConfig {
510 pub fn to_config(&self) -> ProcessingLagConfig {
512 let mut config = ProcessingLagConfig {
513 enabled: self.enabled,
514 ..Default::default()
515 };
516
517 if let Some(lag) = &self.sales_order_lag {
519 config
520 .event_lags
521 .insert(EventType::SalesOrder, lag.to_distribution());
522 }
523 if let Some(lag) = &self.purchase_order_lag {
524 config
525 .event_lags
526 .insert(EventType::PurchaseOrder, lag.to_distribution());
527 }
528 if let Some(lag) = &self.goods_receipt_lag {
529 config
530 .event_lags
531 .insert(EventType::GoodsReceipt, lag.to_distribution());
532 }
533 if let Some(lag) = &self.invoice_receipt_lag {
534 config
535 .event_lags
536 .insert(EventType::InvoiceReceipt, lag.to_distribution());
537 }
538 if let Some(lag) = &self.invoice_issue_lag {
539 config
540 .event_lags
541 .insert(EventType::InvoiceIssue, lag.to_distribution());
542 }
543 if let Some(lag) = &self.payment_lag {
544 config
545 .event_lags
546 .insert(EventType::Payment, lag.to_distribution());
547 }
548 if let Some(lag) = &self.journal_entry_lag {
549 config
550 .event_lags
551 .insert(EventType::JournalEntry, lag.to_distribution());
552 }
553
554 if let Some(cross_day) = &self.cross_day_posting {
556 config.cross_day.enabled = cross_day.enabled;
557 if !cross_day.probability_by_hour.is_empty() {
558 config.cross_day.probability_by_hour = cross_day.probability_by_hour.clone();
559 }
560 }
561
562 config
563 }
564}
565
566#[cfg(test)]
567#[allow(clippy::unwrap_used)]
568mod tests {
569 use super::*;
570
571 #[test]
572 fn test_fixed_lag() {
573 let lag = LagDistribution::fixed(2.0);
574 let mut rng = ChaCha8Rng::seed_from_u64(42);
575
576 for _ in 0..10 {
578 assert!((lag.sample(&mut rng) - 2.0).abs() < 0.01);
579 }
580 }
581
582 #[test]
583 fn test_log_normal_lag() {
584 let lag = LagDistribution::log_normal(0.5, 0.5);
585 let mut rng = ChaCha8Rng::seed_from_u64(42);
586
587 let mut samples: Vec<f64> = (0..1000).map(|_| lag.sample(&mut rng)).collect();
588 samples.sort_by(|a, b| a.partial_cmp(b).unwrap());
589
590 let median = samples[500];
592 assert!(median > 1.0 && median < 3.0);
593
594 assert!(samples.iter().all(|&x| (0.0..=72.0).contains(&x)));
596 }
597
598 #[test]
599 fn test_cross_day_probability() {
600 let config = CrossDayConfig::default();
601
602 assert!(config.next_day_probability(8) < 0.1);
604
605 assert!(config.next_day_probability(14) < 0.1);
607
608 assert!(config.next_day_probability(17) > 0.2);
610 assert!(config.next_day_probability(19) > 0.7);
611 assert!(config.next_day_probability(22) > 0.9);
612 }
613
614 #[test]
615 fn test_processing_lag_calculator() {
616 let mut calc = ProcessingLagCalculator::new(42);
617
618 let event_time = NaiveDateTime::new(
619 NaiveDate::from_ymd_opt(2024, 6, 15).unwrap(),
620 NaiveTime::from_hms_opt(10, 0, 0).unwrap(),
621 );
622
623 let posting_time = calc.calculate_posting_time(EventType::SalesOrder, event_time);
624
625 assert!(posting_time >= event_time);
627
628 let hours_diff = (posting_time - event_time).num_hours();
630 assert!(hours_diff < 24);
631 }
632
633 #[test]
634 fn test_late_event_cross_day() {
635 let mut config = ProcessingLagConfig::default();
637 config.cross_day.probability_by_hour.insert(22, 1.0); let mut calc = ProcessingLagCalculator::with_config(42, config);
640
641 let event_time = NaiveDateTime::new(
642 NaiveDate::from_ymd_opt(2024, 6, 15).unwrap(),
643 NaiveTime::from_hms_opt(22, 0, 0).unwrap(),
644 );
645
646 let posting_time = calc.calculate_posting_time(EventType::SalesOrder, event_time);
647
648 assert!(posting_time.date() > event_time.date());
650 }
651
652 #[test]
653 fn test_event_specific_lags() {
654 let config = ProcessingLagConfig::default();
655
656 let accrual_lag = config.get_lag_distribution(EventType::Accrual);
658 if let LagDistributionType::Fixed { hours } = accrual_lag.distribution {
659 assert!((hours - 0.0).abs() < 0.01);
660 } else {
661 panic!("Accrual should have fixed lag");
662 }
663
664 let invoice_lag = config.get_lag_distribution(EventType::InvoiceReceipt);
666 let sales_lag = config.get_lag_distribution(EventType::SalesOrder);
667
668 if let (
669 LagDistributionType::LogNormal { mu: inv_mu, .. },
670 LagDistributionType::LogNormal { mu: sales_mu, .. },
671 ) = (&invoice_lag.distribution, &sales_lag.distribution)
672 {
673 assert!(inv_mu > sales_mu);
674 }
675 }
676
677 #[test]
678 fn test_schema_config_conversion() {
679 let schema = ProcessingLagSchemaConfig {
680 enabled: true,
681 sales_order_lag: Some(LagSchemaConfig {
682 mu: 1.0,
683 sigma: 0.5,
684 min_hours: Some(0.5),
685 max_hours: Some(24.0),
686 }),
687 cross_day_posting: Some(CrossDaySchemaConfig {
688 enabled: true,
689 probability_by_hour: {
690 let mut m = HashMap::new();
691 m.insert(18, 0.5);
692 m
693 },
694 }),
695 ..Default::default()
696 };
697
698 let config = schema.to_config();
699
700 let sales_lag = config.get_lag_distribution(EventType::SalesOrder);
702 assert!((sales_lag.min_lag_hours - 0.5).abs() < 0.01);
703 assert!((sales_lag.max_lag_hours - 24.0).abs() < 0.01);
704
705 assert_eq!(config.cross_day.probability_by_hour.get(&18), Some(&0.5));
707 }
708
709 #[test]
710 fn test_calculate_posting_date() {
711 let mut calc = ProcessingLagCalculator::new(42);
712
713 let event_date = NaiveDate::from_ymd_opt(2024, 6, 15).unwrap();
714 let posting_date = calc.calculate_posting_date(EventType::JournalEntry, event_date, 10);
715
716 let days_diff = (posting_date - event_date).num_days();
718 assert!(days_diff <= 1);
719 }
720}