1pub mod entity;
2pub mod math;
3pub mod model;
4pub mod reporter;
5pub mod rnd_pool;
6pub mod steps;
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::collections::HashMap;
11use std::collections::VecDeque;
12use std::fmt::Display;
13use std::sync::atomic::AtomicI64;
14use std::sync::Arc;
15use std::thread;
16
17use console::style;
18use crossbeam::channel::Receiver;
19use crossbeam::channel::Sender;
20use entity::customer::Customer;
21use entity::inventory::IncomingShipment;
22use entity::inventory::InventoryCheck;
23use entity::inventory::InventoryRecord;
24use entity::order::Order;
25use entity::product::Product;
26use entity::shipment::Shipment;
27use fieldx_plus::child_build;
28use fieldx_plus::fx_plus;
29use model::customer::CustomerModel;
30use model::product::ProductModel;
31use rand::Rng;
32use rand_distr::num_traits::ToPrimitive;
33use rand_distr::Bernoulli;
34use rand_distr::Distribution;
35use rand_distr::Geometric;
36use rand_distr::Normal;
37use rand_distr::Poisson;
38use reporter::FormattedReporter;
39use reporter::Reporter;
40use reporter::SwReporter;
41use reporter::TaskStatus;
42use rnd_pool::RndPool;
43use sea_orm::prelude::Uuid;
44use steps::ScriptTitle;
45use steps::Step;
46
47use super::db::entity::Order as DbOrder;
48use super::db::entity::Session as DbSession;
49use super::types::simerr;
50use super::types::OrderStatus;
51use super::types::Result;
52
53thread_local! {
54 static BATCH_STEPS: RefCell<Vec<Step>> = const { RefCell::new(Vec::new()) };
55}
56
57const LOGIN_EXPIRE_DAYS: i32 = 7;
58const SESSION_FACTOR: usize = 10;
59
60enum AddPostProc {
61 BackorderNew(i32),
63 BackorderAgain(i32),
64 Pending(i32),
66 Shipping(DbOrder),
67 None,
68}
69
70#[derive(Clone, Debug)]
71enum TaskData {
72 Purchase(PurchaseTask),
74 Returns,
76 Pending(Vec<usize>),
78 Sessions,
80}
81
82#[derive(Clone, Debug)]
83struct PurchaseTask {
84 customers: Vec<Arc<Customer>>,
85 product_interests: Arc<Vec<(i32, f64)>>,
86}
87
88#[derive(Clone, Debug)]
89struct Task {
90 data: TaskData,
91 tx: Sender<Result<()>>,
92}
93
94impl Task {
95 fn done(&self, result: Result<()>) -> Result<()> {
96 Ok(self.tx.send(result)?)
97 }
98}
99
100struct TaskResult {
101 rx: Receiver<Result<()>>,
102}
103
104impl TaskResult {
105 fn wait(&self) -> Result<()> {
106 self.rx.recv()?
107 }
108}
109
110#[fx_plus(parent, sync, rc, builder(post_build, opt_in))]
112pub struct ScriptWriter {
113 #[fieldx(get(copy), builder)]
115 quiet: bool,
116 #[fieldx(default(365), builder)]
118 period: i32,
119
120 #[fieldx(lock, get(copy), set, default(0))]
121 current_day: i32,
122
123 #[fieldx(get(copy), default(1), builder)]
125 initial_customers: u32,
126
127 #[fieldx(get(copy), default(10_000), builder)]
129 market_capacity: u32,
130
131 #[fieldx(get(copy), default(3_000), builder)]
133 inflection_point: u32,
134
135 #[fieldx(get(copy), default(0.05), builder)]
137 growth_rate: f64,
138
139 #[fieldx(get(copy), default(0.15), builder)]
140 min_customer_orders: f64,
141 #[fieldx(get(copy), default(3.0), builder)]
144 max_customer_orders: f64,
145
146 #[fieldx(get(copy), default(10), builder)]
147 product_count: i32,
148
149 #[fieldx(get(copy), default(30), builder)]
150 return_window: i32,
151
152 #[fieldx(lock, get, get_mut)]
154 products: Vec<Product>,
155
156 #[fieldx(lazy, get)]
158 return_probs: BTreeMap<i32, f64>,
159
160 #[fieldx(lock, get, get_mut, set, default(Vec::with_capacity(100_000_000)))]
161 steps: Vec<Step>,
162
163 #[fieldx(inner_mut, get_mut)]
165 returns: HashMap<i32, Vec<Order>>,
166
167 #[fieldx(lock, get, get_mut)]
172 backorders: Vec<VecDeque<usize>>,
173
174 #[fieldx(lazy, clearer, get(copy), default(0))]
175 total_backordered: usize,
176
177 #[fieldx(lock, get, get_mut)]
180 pending_orders: HashMap<i32, Vec<usize>>,
181
182 #[fieldx(lock, get, get_mut)]
184 inventory: Vec<InventoryRecord>,
185
186 #[fieldx(lock, get, get_mut)]
188 shipments: BTreeMap<i32, Vec<Shipment>>,
189
190 #[fieldx(lock, get, get_mut)]
192 product_shipping: BTreeMap<i32, i32>,
193
194 #[fieldx(lock, get, get_mut, default(HashMap::new()))]
196 customers: HashMap<String, Arc<Customer>>,
197
198 #[fieldx(lock, get, get_mut)]
200 customer_counts: VecDeque<u32>,
201
202 #[fieldx(get(copy), default(7))]
204 customer_history_length: usize,
205
206 #[fieldx(inner_mut, get(copy), set)]
208 next_day_customers: usize,
209
210 #[fieldx(lazy, get(clone))]
211 random_pool: Arc<RndPool>,
212
213 #[fieldx(builder(off))]
214 customer_model: Option<CustomerModel>,
215
216 #[fieldx(lazy, get)]
217 product_price_model: Arc<ProductModel>,
218
219 #[fieldx(lazy, clearer, private, builder(off), get)]
221 task_tx: Sender<Task>,
222 #[fieldx(lazy, private, builder(off), get(copy))]
223 worker_count: usize,
224 #[fieldx(lock, private, get, get_mut, builder(off))]
225 task_handlers: Vec<std::thread::JoinHandle<usize>>,
226
227 #[fieldx(lazy, private, get, builder(off))]
228 reporter: Reporter,
229
230 #[fieldx(lock, clearer, get(copy), set, builder(off))]
231 track_product: usize,
232
233 next_session_id: AtomicI64,
234
235 #[fieldx(lazy, private, clearer, get(copy), builder(off))]
237 session_base: i64,
238
239 #[fieldx(lazy, inner_mut, get, get_mut, builder(off))]
240 customer_sessions: Vec<Option<DbSession>>,
241}
242
243impl ScriptWriter {
244 fn post_build(mut self) -> Self {
245 self.set_next_day_customers(self.initial_customers() as usize);
246
247 self.setup_customer_model();
248
249 self.customer_model()
253 .adjust_growth_rate(self.customer_model().market_capacity() * 0.75, self.period * 4 / 5);
254
255 self
256 }
257
258 pub fn create(&self) -> Result<Vec<Step>> {
259 self.direct()?;
260 self.reporter().stop()?;
261 self.clear_task_tx();
262 Ok(self.set_steps(Vec::new()))
263 }
264
265 pub fn direct(&self) -> Result<()> {
266 self.reporter().start()?;
267
268 self.add_step(Step::Title(ScriptTitle {
269 period: self.period,
270 products: self.product_count(),
271 market_capacity: self.market_capacity,
272 }))?;
273
274 self.init_products()?;
275 self.register_customers()?;
276 self.order_shipments(true)?;
277
278 for day in 1..=self.period {
279 self.set_current_day(day);
280 self.clear_session_base();
281 self.add_step(Step::Day(day))?;
282 let sessions_task = self.submit_task(TaskData::Sessions)?;
283 self.replenish_inventory()?;
284 self.fulfill_pending()?;
285 self.register_customers()?;
286 self.process_returns()?;
287 self.place_orders()?;
288
289 self.order_shipments(false)?;
291 sessions_task.wait()?;
292 self.add_step(Step::CollectSessions)?;
293 self.log_progress()?;
294 }
295
296 self.reporter().out("--- Simulation finished ---")?;
297 self.clear_task_tx();
298 while !self.task_handlers().is_empty() {
299 self.task_handlers_mut().pop().unwrap().join().unwrap();
300 }
301 self.reporter().out("--- All threads finished ---")?;
302
303 self.reporter().stop()?;
304
305 Ok(())
306 }
307
308 fn maybe_init_batch_steps(&self, batch_steps: &mut Vec<Step>) {
309 if batch_steps.capacity() == 0 {
310 batch_steps.reserve(
311 self.customer_model().market_capacity() as usize * self.max_customer_orders() as usize * 2
312 / self.worker_count(),
313 );
314 }
315 }
316
317 fn submit_task(&self, data: TaskData) -> Result<TaskResult> {
318 let (tx, rx) = crossbeam::channel::unbounded();
319 let task = Task { data, tx };
320 let task_result = TaskResult { rx };
321 self.task_tx().send(task)?;
322 Ok(task_result)
323 }
324
325 fn task_purchases(&self, task: &PurchaseTask) -> Result<()> {
326 let mut rng = rand::rng();
327 let thread = thread::current();
328 let thread_id = thread.name().unwrap_or("unknown");
329 let return_probs = self.return_probs().clone();
330
331 BATCH_STEPS.with_borrow_mut(|batch_steps| -> Result<()> {
332 self.maybe_init_batch_steps(batch_steps);
333
334 let init_capacity = batch_steps.capacity();
335 let mut crecords = Vec::with_capacity(task.customers.len());
336 let mut expected_steps = 0;
337 let mut planned_returns: BTreeMap<i32, Vec<Order>> = BTreeMap::new();
338 let return_window = self.return_window() as f32;
339 let current_day = self.current_day();
340
341 for customer in task.customers.iter() {
342 let customer_purchases = customer.daily_purchases()?;
343 expected_steps += customer_purchases as usize;
344 crecords.push((customer, customer_purchases));
345
346 if customer_purchases > 0.0 || Bernoulli::new(1.0 - (-customer_purchases).exp()).unwrap().sample(&mut rng) {
351 self.customer_login(customer.id())?;
352 }
353 }
354
355 let mut total_purchases = 0;
356 for (customer, customer_purchases) in crecords {
359 let customer_id = customer.id();
360
361 if customer_purchases == 0.0 {
365 continue;
367 }
368
369 let mut purchases = 0;
370
371 for (product_id, product_interest) in task.product_interests.iter() {
372
373 if rng.random_range(0.0..1.0) <= self.products()[*product_id as usize].view_probability() {
375 batch_steps.push(Step::ViewProduct(*product_id));
376 }
377
378 let product_items = Poisson::new(customer_purchases * product_interest)
379 .unwrap()
380 .sample(&mut rng)
381 .round() as i32;
382
383 if product_items == 0 {
384 continue;
386 }
387
388 let order = Order {
389 id: Uuid::new_v4(),
390 customer_id,
391 product_id: *product_id,
392 quantity: product_items,
393 status: OrderStatus::New,
394 };
395
396 if rng.random_range(0.0..1.0) < *return_probs.get(product_id).unwrap() {
397 let on_day =
398 rng.random_range(0.0..return_window).round() as i32 + 1 + current_day;
399 if on_day < self.period {
400 planned_returns
401 .entry(on_day)
402 .or_default()
403 .push(order.clone());
404 }
405 }
406
407 self.adjust_capacity(batch_steps, 100);
408 batch_steps.push(Step::AddOrder(order.into()));
409
410 purchases += product_items;
411 }
412
413 total_purchases += purchases;
414 }
415
416 if batch_steps.capacity() > init_capacity {
417 self.reporter().out(&format!(
418 "Thread {}: Purchase task steps capacity increased from {} to {} (expected steps: {}, total steps: {}, batch steps: {})",
419 thread_id,
420 init_capacity,
421 batch_steps.capacity(),
422 expected_steps,
423 total_purchases,
424 batch_steps.len()
425 ))?;
426 }
427
428 self.add_steps(batch_steps.drain(0..batch_steps.len()))?;
429
430 let mut returns = self.returns_mut();
431 for (day, orders) in planned_returns {
432 returns.entry(day).or_default().extend(orders);
433 }
434
435 Ok(())
436 })
437 }
438
439 fn task_returns(&self) -> Result<()> {
440 BATCH_STEPS.with_borrow_mut(|batch_steps| -> Result<()> {
441 self.maybe_init_batch_steps(batch_steps);
442 let mut returns = self.returns_mut();
443
444 if let Some(today_returns) = returns.remove(&self.current_day()) {
446 for mut order in today_returns {
453 order.status = OrderStatus::Refunded;
454 batch_steps.push(Step::UpdateOrder(order.into()));
455 }
456 }
457
458 self.add_steps(batch_steps.drain(0..batch_steps.len()))?;
459
460 Ok(())
461 })
462 }
463
464 fn task_pending(&self, indicies: &[usize]) -> Result<()> {
465 BATCH_STEPS.with_borrow_mut(|batch_steps| -> Result<()> {
466 self.maybe_init_batch_steps(batch_steps);
467
468 for step_idx in indicies.iter().copied() {
469 let mut order = self.order_by_idx(step_idx)?;
470
471 order.status = OrderStatus::Shipped;
472 batch_steps.push(Step::UpdateOrder(order));
473 }
474
475 self.add_steps(batch_steps.drain(0..batch_steps.len()))?;
476
477 Ok(())
478 })
479 }
480
481 fn next_session_id(&self) -> i64 {
482 self.next_session_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + self.session_base()
483 }
484
485 fn task_sessions(&self) -> Result<()> {
486 let day = self.current_day();
494 let anticipated_sessions = (self.customers().len() * SESSION_FACTOR) as f64;
495 let sigma = anticipated_sessions * 0.2;
496
497 let todays_sessions = Normal::new(anticipated_sessions, sigma)
498 .map_err(|err| simerr!("Normal distribution: {}", err))?
499 .sample(&mut rand::rng())
500 .round() as u32;
501
502 for _ in 0..todays_sessions {
503 self.add_step(Step::AddSession(DbSession {
504 id: self.next_session_id(),
505 customer_id: None,
506 expires_on: day + 3,
507 }))?;
508 }
509
510 Ok(())
511 }
512
513 fn task_worker(&self, thread_id: usize, rx: Receiver<Task>) -> Result<()> {
514 let thread_name = thread::current()
515 .name()
516 .ok_or(simerr!("Worker thread must have a name"))?
517 .to_owned();
518
519 loop {
520 let task = {
521 let Ok(task) = rx.recv()
522 else {
523 break;
524 };
525 task
526 };
527
528 self.reporter().set_task_status(thread_id, TaskStatus::Busy);
529
530 task.done(
531 match task.data {
532 TaskData::Purchase(ref p_task) => self.task_purchases(p_task),
533 TaskData::Returns => self.task_returns(),
534 TaskData::Pending(ref p_task) => self.task_pending(p_task),
535 TaskData::Sessions => self.task_sessions(),
536 }
537 .inspect_err(|err| {
538 err.context(format!("task worker {thread_name}"));
539 }),
540 )?;
541
542 self.reporter().set_task_status(thread_id, TaskStatus::Idle);
543 }
544
545 Ok(())
546 }
547
548 fn log_progress(&self) -> Result<()> {
549 let reporter = self.reporter();
550 reporter.set_scenario_lines(self.steps().len());
551 let total_backordered = self.total_backordered();
552 reporter.set_backorders(total_backordered);
553 let total_pending = self.pending_orders().values().map(|queue| queue.len()).sum();
554 reporter.set_pending_orders(total_pending);
555 if total_backordered > 100_000 {
556 let backorders = self.backorders();
558 let mut bo = backorders.iter().enumerate().collect::<Vec<_>>();
559 bo.sort_by(|a, b| b.1.len().cmp(&a.1.len()));
560 let product_id = bo[0].0;
561 self.set_track_product(product_id);
562 let product = &self.products()[product_id];
563 reporter.out(&format!(
564 "Top backordered product {}: {} orders:\n Daily quotient: {:.2}, daily estimate: {:.2}, expected return rate: {:.2}\n Stock supplies in: {:.2}, supplier inaccuracy: {:.2}, supplier tardiness: {:.2}",
565 product.id(),
566 bo[0].1.len(),
567 product.daily_quotient(),
568 product.daily_estimate(),
569 product.expected_return_rate(),
570 product.stock_supplies_in(),
571 product.supplier_inaccuracy(),
572 product.supplier_tardiness(),
573 ))?;
574 }
575 else {
576 self.clear_track_product();
577 }
578
579 let mut check_steps = vec![];
580 for inv_rec in self.inventory().iter() {
581 check_steps.push(Step::CheckInventory(InventoryCheck::new(
582 inv_rec.product_id(),
583 inv_rec.stock(),
584 "daily",
585 )));
586 }
587 self.add_steps(check_steps)?;
588
589 self.reporter().refresh_report()
600 }
601
602 #[inline(always)]
603 fn adjust_capacity<T>(&self, steps: &mut Vec<T>, delta: usize) {
604 let step_count = steps.len();
605 if (step_count + delta) >= steps.capacity() {
606 steps.reserve(step_count);
608 }
610 }
611
612 fn process_order(&self, mut step: Step) -> Result<(Step, AddPostProc)> {
613 let order = match step {
614 Step::AddOrder(ref mut order) | Step::UpdateOrder(ref mut order) => order,
615 _ => return Err(simerr!("Non-order step submitted for order processing: {:?}", step)),
616 };
617
618 let mut post_proc = AddPostProc::None;
619
620 if order.status == OrderStatus::New || order.status == OrderStatus::Recheck {
621 let new_order = order.status == OrderStatus::New;
622 let mut inventory = self.inventory_mut();
623 let Some(inv_rec) = inventory.get_mut(order.product_id as usize)
624 else {
625 return Err(simerr!("Product {} not found in inventory", order.product_id));
626 };
627
628 if inv_rec.stock() < order.quantity as i64 {
629 order.status = OrderStatus::Backordered;
631 if new_order {
632 post_proc = AddPostProc::BackorderNew(order.product_id);
633 }
634 else {
635 post_proc = AddPostProc::BackorderAgain(order.product_id);
636 }
637 }
638 else {
639 order.status = OrderStatus::Pending;
640 if inv_rec.handling_days() > 0 {
641 post_proc = AddPostProc::Pending(self.current_day() + inv_rec.handling_days() as i32);
643 }
648 else {
649 let mut shipping_order = order.clone();
652 shipping_order.status = OrderStatus::Shipped;
653 post_proc = AddPostProc::Shipping(shipping_order);
654 }
655 *inv_rec.stock_mut() -= order.quantity as i64;
657 }
658 }
659
660 Ok((step, post_proc))
661 }
662
663 #[inline]
664 fn add_step(&self, step: Step) -> Result<()> {
665 self.add_steps([step])
666 }
667
668 fn add_steps<S: IntoIterator<Item = Step>>(&self, new_steps: S) -> Result<()> {
669 let mut steps = self.steps_mut();
670 self.adjust_capacity(&mut *steps, 10_000);
671
672 let mut backordered_new: Vec<(i32, usize)> = Vec::new();
674 let mut backordered_again: Vec<(i32, usize)> = Vec::new();
675 let mut pending_orders: Vec<(i32, usize)> = Vec::new();
677
678 for mut step in new_steps.into_iter() {
679 let mut post_proc = AddPostProc::None;
680 match step {
681 Step::AddOrder(_) | Step::UpdateOrder(_) => {
682 (step, post_proc) = self.process_order(step)?;
683 }
684 _ => (),
685 }
686
687 steps.push(step);
688 let step_idx = steps.len() - 1;
689
690 match post_proc {
691 AddPostProc::Pending(day) => {
692 pending_orders.push((day, step_idx));
693 }
694 AddPostProc::BackorderNew(product_id) => {
695 backordered_new.push((product_id, step_idx));
696 }
697 AddPostProc::BackorderAgain(product_id) => {
698 backordered_again.push((product_id, step_idx));
699 }
700 AddPostProc::Shipping(shipping_order) => {
701 steps.push(Step::UpdateOrder(shipping_order));
708 }
709 AddPostProc::None => (),
710 }
711 }
712
713 if !pending_orders.is_empty() {
714 let mut pendings = self.pending_orders_mut();
715 for (day, step_idx) in pending_orders {
716 pendings.entry(day).or_default().push(step_idx);
717 }
718 }
719
720 if !(backordered_new.is_empty() && backordered_again.is_empty()) {
721 let mut backorders = self.backorders_mut();
722 for (product_id, step_idx) in backordered_again {
723 backorders[product_id as usize].push_front(step_idx);
724 }
725 for (product_id, step_idx) in backordered_new {
726 backorders[product_id as usize].push_back(step_idx);
727 }
728 }
729
730 self.reporter().set_scenario_lines(steps.len());
731 self.reporter().set_scenario_capacity(steps.capacity());
732
733 Ok(())
734 }
735
736 fn init_products(&self) -> Result<()> {
738 let mut products = self.products_mut();
739 let random_pool = self.random_pool();
740 let price_model = self.product_price_model();
741 let product_count = self.product_count() as usize;
742 let geometric = Geometric::new(0.7).unwrap();
743 let mut rng = rand::rng();
744
745 products.reserve(product_count);
746 self.backorders_mut().resize(product_count, VecDeque::new());
747
748 for id in 0..self.product_count() {
749 let price = price_model.next_price();
750 let daily_quotient = random_pool.next_rand();
751 let expected_return_rate = random_pool.next_rand() * 0.3;
753 let ship_terms = 3.0 + random_pool.next_rand() * 18.0;
757 let supplier_inaccuracy = random_pool.next_rand() * 0.15 + 0.05;
759 let supplier_tardiness = random_pool.next_rand() * 1.6 - 0.9;
761 let product = Product::builder()
762 .id(id)
763 .name(format!("Product {id}"))
764 .price(price)
765 .daily_quotient(daily_quotient)
766 .expected_return_rate(expected_return_rate)
767 .stock_supplies_in(ship_terms)
768 .supplier_inaccuracy(supplier_inaccuracy)
769 .supplier_tardiness(supplier_tardiness)
770 .product_model(self.product_price_model().clone())
771 .build()
772 .unwrap();
773 let step = Step::AddProduct(product.clone().into());
774 self.add_step(step)?;
775
776 products.push(product);
777
778 let handling_days = geometric.sample(&mut rng).min(255) as i16;
780 let inv_rec = InventoryRecord::new(id, 0, handling_days);
781 self.inventory_mut().push(inv_rec.clone());
782 self.add_step(Step::AddInventoryRecord(inv_rec.into()))?;
783 }
784
785 Ok(())
786 }
787
788 fn register_customers(&self) -> Result<()> {
789 let new_customers = self.next_day_customers();
790
791 for _ in 0..new_customers {
792 let customer = self.random_pool().next_customer();
793 self.add_step(Step::AddCustomer(customer.clone().into()))?;
794 if self.customers().contains_key(customer.email()) {
795 panic!("Customer with email {} already exists", customer.email());
796 }
797 self.customers_mut()
798 .insert(customer.email().clone(), Arc::new(customer));
799 }
800
801 if self.current_day() < self.period {
802 let next_cust = self.customer_model().expected_customers(self.current_day() + 1).round() as usize
803 - self.customers().len();
804 self.set_next_day_customers(next_cust);
805
806 let mut customer_counts = self.customer_counts_mut();
808 while customer_counts.len() >= self.customer_history_length() {
809 customer_counts.pop_front();
810 }
811 customer_counts.push_back(self.customers().len() as u32);
812 }
813
814 Ok(())
815 }
816
817 fn customer_login(&self, customer_id: i32) -> Result<()> {
818 let sess_idx = customer_id as usize - 1;
819 if let Some(existing_session) = &mut self.customer_sessions_mut()[sess_idx] {
820 if existing_session.expires_on >= self.current_day() {
821 existing_session.expires_on = self.current_day() + LOGIN_EXPIRE_DAYS;
823 self.add_step(Step::UpdateSession(existing_session.clone()))?;
824 return Ok(());
825 }
826 }
827
828 let session = DbSession {
830 id: self.next_session_id(),
831 customer_id: Some(customer_id),
832 expires_on: self.current_day() + LOGIN_EXPIRE_DAYS,
833 };
834 self.customer_sessions_mut()[customer_id as usize - 1] = Some(session.clone());
835 self.add_step(Step::AddSession(session))?;
836 Ok(())
837 }
838
839 fn order_shipments(&self, initial: bool) -> Result<()> {
840 self.clear_total_backordered();
841
842 let customer_count = self.customers().len() as f64;
843 let products = self.products();
844 let inventory = self.inventory();
845 let mut shipments = self.shipments_mut();
846 let anticipated_growth = {
847 let base = self
848 .customer_counts()
849 .front()
850 .map(|rec| *rec as f64)
851 .unwrap_or(self.customer_model().initial_customers());
852 self.customers().len() as f64 / base
853 };
854
855 'inv_record: for product in products.iter() {
856 let product_id = product.id();
857
858 let estimate_sales = self.total_backordered() as i32
862 + (product.daily_estimate() * customer_count * anticipated_growth * product.supplies_in()).round()
863 as i32;
864 let being_shipped = *self.product_shipping_mut().entry(product_id).or_insert(0);
865
866 if let Some(tracked_product) = self.track_product() {
867 if tracked_product == product_id as usize {
868 self.reporter().out(&format!(
869 "Product {}: being shipped: {}, estimate sales: {}, inventory: {}",
870 product_id,
871 being_shipped,
872 estimate_sales,
873 inventory.get(tracked_product).map_or(0, |rec| rec.stock())
874 ))?;
875 }
876 }
877
878 if being_shipped > estimate_sales || estimate_sales == 0 {
879 continue 'inv_record;
880 }
881
882 let mut batch_size = estimate_sales - being_shipped;
883
884 let arrives_on = if initial {
885 1
886 }
887 else {
888 if let Some(inv_rec) = inventory.get(product_id as usize) {
889 if inv_rec.stock() > batch_size as i64 {
891 continue 'inv_record;
892 }
893 batch_size -= inv_rec.stock() as i32;
895 }
896 self.current_day() + product.supplies_in().ceil().to_i32().unwrap()
899 };
900
901 if batch_size > 0 {
902 let shipment = Shipment::builder()
903 .product_id(product_id)
904 .batch_size(batch_size)
905 .arrives_on(arrives_on)
906 .build()
907 .unwrap();
908
909 if let Some(tracked_product) = self.track_product() {
910 if tracked_product == product_id as usize {
911 self.reporter().out(&format!(
912 "Order shipment for product {}: {} units, arrives in {} days",
913 product_id,
914 shipment.batch_size(),
915 shipment.arrives_on() - self.current_day()
916 ))?;
917 }
918 }
919
920 *self.product_shipping_mut().get_mut(&product_id).unwrap() += shipment.batch_size();
921 shipments.entry(arrives_on).or_default().push(shipment);
922 }
923 }
924
925 Ok(())
926 }
927
928 fn replenish_inventory(&self) -> Result<()> {
930 let mut all_shipments = self.shipments_mut();
931 let products = self.products();
932 let mut inv_steps = Vec::new();
933
934 if let Some(shipments) = all_shipments.remove(&self.current_day()) {
957 let mut inventory = self.inventory_mut();
958 let mut affected_products = vec![false; self.product_count() as usize];
959 for shipment in shipments {
960 let product = products.get(shipment.product_id() as usize).unwrap();
961 let product_id = product.id() as usize;
962 let batch_size = shipment.batch_size();
963 let inv_rec = inventory.get_mut(product_id).unwrap();
964
965 affected_products[product_id] = true;
966
967 let new_stock = inv_rec.stock() + batch_size as i64;
974 *inv_rec.stock_mut() = new_stock;
975
976 if let Some(tracked_product) = self.track_product() {
977 if tracked_product == product_id {
978 self.reporter().out(&format!(
979 "Arrived shipment for product {product_id}: {batch_size} units, new stock: {new_stock}",
980 ))?;
981 }
982 }
983
984 inv_steps.push(Step::AddStock(IncomingShipment {
988 product_id: shipment.product_id(),
989 batch_size: shipment.batch_size(),
990 }));
991
992 *self.product_shipping_mut().get_mut(&product.id()).unwrap() -= shipment.batch_size();
1006
1007 }
1015
1016 for (product_id, updated) in affected_products.iter().enumerate() {
1017 if *updated {
1018 let mut backorders = self.backorders_mut();
1019
1020 if let Some(orders) = backorders.get_mut(product_id) {
1021 let inv_rec = &inventory[product_id];
1029
1030 while !orders.is_empty() {
1031 let mut order = self.order_by_idx(*orders.front().unwrap())?;
1032
1033 if order.product_id != product_id as i32 {
1034 return Err(simerr!(
1035 "Backorder {} product ID {} does not match the shipment's product ID {}",
1036 order.id,
1037 order.product_id,
1038 product_id
1039 ));
1040 }
1041
1042 if order.quantity as i64 <= inv_rec.stock() {
1043 let _ = orders.pop_front().unwrap();
1045 order.status = OrderStatus::Recheck;
1046 inv_steps.push(Step::UpdateOrder(order));
1048 }
1049 else {
1050 break;
1052 }
1053 }
1054 }
1055 }
1056 }
1057 }
1058
1059 self.add_steps(inv_steps)
1071 }
1072
1073 fn fulfill_pending(&self) -> Result<()> {
1074 let mut pending_orders = self.pending_orders_mut();
1075
1076 let Some(indicies) = pending_orders.remove(&self.current_day())
1077 else {
1078 return Ok(());
1079 };
1080
1081 let order_count = indicies.len();
1084 let batch_count = (order_count / 1000 + 1).min(self.worker_count());
1085 let batch_size = order_count / batch_count;
1086
1087 let mut tasks = vec![];
1088
1089 for idx_chunk in indicies.chunks(batch_size) {
1090 tasks.push(self.submit_task(TaskData::Pending(idx_chunk.to_vec()))?);
1091 }
1092
1093 for task in tasks {
1094 task.wait()?;
1095 }
1096
1097 Ok(())
1098 }
1099
1100 fn place_orders(&self) -> Result<()> {
1101 let customers = self.customers();
1102 let products = self.products();
1103
1104 let total_sale_rate: f64 = products.iter().map(|p| p.daily_estimate()).sum();
1106 let prod_sale_rate: Arc<Vec<(_, _)>> = Arc::new(
1107 products
1108 .iter()
1109 .map(|p| (p.id(), p.daily_estimate() / total_sale_rate))
1110 .collect(),
1111 );
1112
1113 let mut task_results = Vec::new();
1114
1115 let cust_count = customers.len();
1116 let task_count = self.worker_count();
1117 let batch_size = (cust_count / task_count) + (cust_count % task_count > 0) as usize;
1118
1119 for cust_chunk in customers.values().cloned().collect::<Vec<_>>().chunks(batch_size) {
1120 let cust_chunk = cust_chunk.to_vec();
1121
1122 let task = PurchaseTask {
1123 customers: cust_chunk,
1124 product_interests: prod_sale_rate.clone(),
1125 };
1126
1127 task_results.push(self.submit_task(TaskData::Purchase(task))?);
1128 }
1129
1130 let mut succeed = true;
1131 for tr in task_results {
1132 if let Err(err) = tr.wait() {
1133 self.reporter().out(
1134 &style(format!("Failed to process task: {err}"))
1135 .red()
1136 .bright()
1137 .to_string(),
1138 )?;
1139 succeed = false;
1140 }
1141 }
1142
1143 if succeed {
1144 Ok(())
1145 }
1146 else {
1147 Err(simerr!("Order processing was not successful"))
1148 }
1149 }
1150
1151 fn process_returns(&self) -> Result<()> {
1152 let task_result = self.submit_task(TaskData::Returns)?;
1153
1154 task_result.wait()
1155 }
1156
1157 #[allow(unused)]
1158 fn report<S: Display>(&self, msg: S) -> Result<()> {
1159 self.reporter().out(&msg.to_string())
1160 }
1161
1162 fn order_by_idx(&self, idx: usize) -> Result<DbOrder> {
1163 let steps = self.steps();
1164 let Some(step) = steps.get(idx)
1165 else {
1166 return Err(simerr!("Step index {} not found", idx));
1167 };
1168 Ok(match step {
1169 Step::AddOrder(ref order) => order,
1170 Step::UpdateOrder(ref order) => order,
1171 _ => Err(simerr!("Script step at index {} is not an order", idx))?,
1172 }
1173 .clone())
1174 }
1175
1176 pub fn customer_model(&self) -> &CustomerModel {
1177 self.customer_model.as_ref().expect("Customer model not initialized")
1178 }
1179
1180 fn setup_customer_model(&mut self) {
1181 self.customer_model = Some(
1182 CustomerModel::builder()
1183 .initial_customers(self.initial_customers() as f64)
1184 .market_capacity(self.market_capacity() as f64)
1185 .inflection_point(self.inflection_point() as f64)
1186 .growth_rate(self.growth_rate())
1187 .build()
1188 .unwrap(),
1189 );
1190 }
1191
1192 fn build_worker_count(&self) -> usize {
1193 let threads = (num_cpus::get()).max(1);
1195 self.reporter().set_task_count(threads);
1196 threads
1197 }
1198
1199 fn build_task_tx(&self) -> Sender<Task> {
1200 let threads = self.worker_count();
1202 let (tx, rx) = crossbeam::channel::unbounded();
1203 for thread_id in 0..threads {
1204 let rx = rx.clone();
1205 let myself = self.myself().unwrap();
1206 let task_handler = thread::Builder::new()
1207 .name(format!("task_worker_{thread_id}"))
1208 .spawn(move || {
1209 let mut retry = 1;
1210 while let Err(err) = myself.task_worker(thread_id, rx.clone()) {
1211 eprintln!("Task worker thread {thread_id} failed [{retry}]: {err:?}");
1212 retry += 1;
1213 if retry > 3 {
1214 eprintln!("Task worker thread {thread_id} failed too many times, exiting");
1215 break;
1216 }
1217 }
1218 thread_id
1219 })
1220 .unwrap();
1221 self.task_handlers_mut().push(task_handler);
1222 }
1223
1224 tx
1225 }
1226
1227 fn build_reporter(&self) -> Reporter {
1228 if self.quiet() {
1229 Reporter::Quiet
1230 }
1231 else {
1232 Reporter::Formatted(child_build!(self, FormattedReporter).unwrap())
1233 }
1234 }
1235
1236 fn build_random_pool(&self) -> Arc<RndPool> {
1237 child_build!(
1238 self,
1239 RndPool {
1240 min_customer_orders: self.min_customer_orders,
1241 max_customer_orders: self.max_customer_orders,
1242 customers_full: 100,
1243 }
1244 )
1245 .unwrap()
1246 }
1247
1248 fn build_return_probs(&self) -> BTreeMap<i32, f64> {
1249 let mut return_probs = BTreeMap::new();
1250 let return_window = self.return_window() as f64;
1251 for product in self.products().iter() {
1252 return_probs.insert(product.id(), product.expected_return_rate() / return_window);
1253 }
1254 return_probs
1255 }
1256
1257 fn build_total_backordered(&self) -> usize {
1258 self.backorders().iter().map(|bo| bo.len()).sum()
1259 }
1260
1261 fn build_product_price_model(&self) -> Arc<ProductModel> {
1262 ProductModel::builder().build().unwrap()
1263 }
1264
1265 fn build_customer_sessions(&self) -> Vec<Option<DbSession>> {
1266 vec![None; self.market_capacity() as usize]
1267 }
1268
1269 fn build_session_base(&self) -> i64 {
1270 self.current_day() as i64 * 10i64.pow((self.market_capacity() as i64 * SESSION_FACTOR as i64).ilog10() + 1)
1271 }
1272}