wb_cache/test/simulation/
scriptwriter.rs

1pub mod entity;
2pub mod math;
3pub mod model;
4pub mod reporter;
5pub mod rnd_pool;
6pub mod steps;
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::collections::HashMap;
11use std::collections::VecDeque;
12use std::fmt::Display;
13use std::sync::atomic::AtomicI64;
14use std::sync::Arc;
15use std::thread;
16
17use console::style;
18use crossbeam::channel::Receiver;
19use crossbeam::channel::Sender;
20use entity::customer::Customer;
21use entity::inventory::IncomingShipment;
22use entity::inventory::InventoryCheck;
23use entity::inventory::InventoryRecord;
24use entity::order::Order;
25use entity::product::Product;
26use entity::shipment::Shipment;
27use fieldx_plus::child_build;
28use fieldx_plus::fx_plus;
29use model::customer::CustomerModel;
30use model::product::ProductModel;
31use rand::Rng;
32use rand_distr::num_traits::ToPrimitive;
33use rand_distr::Bernoulli;
34use rand_distr::Distribution;
35use rand_distr::Geometric;
36use rand_distr::Normal;
37use rand_distr::Poisson;
38use reporter::FormattedReporter;
39use reporter::Reporter;
40use reporter::SwReporter;
41use reporter::TaskStatus;
42use rnd_pool::RndPool;
43use sea_orm::prelude::Uuid;
44use steps::ScriptTitle;
45use steps::Step;
46
47use super::db::entity::Order as DbOrder;
48use super::db::entity::Session as DbSession;
49use super::types::simerr;
50use super::types::OrderStatus;
51use super::types::Result;
52
53thread_local! {
54    static BATCH_STEPS: RefCell<Vec<Step>> = const { RefCell::new(Vec::new()) };
55}
56
57const LOGIN_EXPIRE_DAYS: i32 = 7;
58const SESSION_FACTOR: usize = 10;
59
60enum AddPostProc {
61    // Backorders for the product ID.
62    BackorderNew(i32),
63    BackorderAgain(i32),
64    /// Pending until the specified day.
65    Pending(i32),
66    Shipping(DbOrder),
67    None,
68}
69
70#[derive(Clone, Debug)]
71enum TaskData {
72    /// Generate purchase orders for customers.
73    Purchase(PurchaseTask),
74    /// Generate order returns.
75    Returns,
76    /// Fulfill pending orders. Takes list of order indices in the steps array.
77    Pending(Vec<usize>),
78    /// Generate temporary sessions for non-login visitors.
79    Sessions,
80}
81
82#[derive(Clone, Debug)]
83struct PurchaseTask {
84    customers:         Vec<Arc<Customer>>,
85    product_interests: Arc<Vec<(i32, f64)>>,
86}
87
88#[derive(Clone, Debug)]
89struct Task {
90    data: TaskData,
91    tx:   Sender<Result<()>>,
92}
93
94impl Task {
95    fn done(&self, result: Result<()>) -> Result<()> {
96        Ok(self.tx.send(result)?)
97    }
98}
99
100struct TaskResult {
101    rx: Receiver<Result<()>>,
102}
103
104impl TaskResult {
105    fn wait(&self) -> Result<()> {
106        self.rx.recv()?
107    }
108}
109
110/// Generate a scenario to be executed by the Company simulator.
111#[fx_plus(parent, sync, rc, builder(post_build, opt_in))]
112pub struct ScriptWriter {
113    /// Supress all output.
114    #[fieldx(get(copy), builder)]
115    quiet:  bool,
116    /// For how many days the scenario will run.
117    #[fieldx(default(365), builder)]
118    period: i32,
119
120    #[fieldx(lock, get(copy), set, default(0))]
121    current_day: i32,
122
123    /// How many customers we have the day 1
124    #[fieldx(get(copy), default(1), builder)]
125    initial_customers: u32,
126
127    /// The maximum number of customer the company can have by the end of the simulation period.
128    #[fieldx(get(copy), default(10_000), builder)]
129    market_capacity: u32,
130
131    /// Where customer base growth reaches its peak.
132    #[fieldx(get(copy), default(3_000), builder)]
133    inflection_point: u32,
134
135    /// Company's "success" rate – how fast the customer base grows
136    #[fieldx(get(copy), default(0.05), builder)]
137    growth_rate: f64,
138
139    #[fieldx(get(copy), default(0.15), builder)]
140    min_customer_orders: f64,
141    // This value is not a hard upper limit but the boundary within which 90% of randomly generated customer expected
142    // daily orders will fall. The remaining 10% might be higher than this value.
143    #[fieldx(get(copy), default(3.0), builder)]
144    max_customer_orders: f64,
145
146    #[fieldx(get(copy), default(10), builder)]
147    product_count: i32,
148
149    #[fieldx(get(copy), default(30), builder)]
150    return_window: i32,
151
152    /// Index in the vector is the products ID.
153    #[fieldx(lock, get, get_mut)]
154    products: Vec<Product>,
155
156    /// Product return probability per day, by ID.
157    #[fieldx(lazy, get)]
158    return_probs: BTreeMap<i32, f64>,
159
160    #[fieldx(lock, get, get_mut, set, default(Vec::with_capacity(100_000_000)))]
161    steps: Vec<Step>,
162
163    /// "Planned" returns, per day
164    #[fieldx(inner_mut, get_mut)]
165    returns: HashMap<i32, Vec<Order>>,
166
167    /// Backorders by their indices in the steps array, per product ID. When inventory is replenished, these orders will
168    /// be processed first.
169    ///
170    /// Since product ID is its vector index, we use a Vec here too.
171    #[fieldx(lock, get, get_mut)]
172    backorders: Vec<VecDeque<usize>>,
173
174    #[fieldx(lazy, clearer, get(copy), default(0))]
175    total_backordered: usize,
176
177    /// Pending orders by their indices in the steps array, keyed by shipping day.
178    /// This is a map from the day to a queue of orders.
179    #[fieldx(lock, get, get_mut)]
180    pending_orders: HashMap<i32, Vec<usize>>,
181
182    /// Inventory record per product ID. Since product ID is its vector index in self.products, we use a Vec here too.
183    #[fieldx(lock, get, get_mut)]
184    inventory: Vec<InventoryRecord>,
185
186    /// Map shipment day into shipments.
187    #[fieldx(lock, get, get_mut)]
188    shipments: BTreeMap<i32, Vec<Shipment>>,
189
190    /// Map product ID into the number of items currently shipping.
191    #[fieldx(lock, get, get_mut)]
192    product_shipping: BTreeMap<i32, i32>,
193
194    /// Map unique email into customer.
195    #[fieldx(lock, get, get_mut, default(HashMap::new()))]
196    customers: HashMap<String, Arc<Customer>>,
197
198    /// Customer counts over the last few days.
199    #[fieldx(lock, get, get_mut)]
200    customer_counts: VecDeque<u32>,
201
202    /// Customer history length
203    #[fieldx(get(copy), default(7))]
204    customer_history_length: usize,
205
206    /// Keep track of the number of customers that are expected to be registered the next day.
207    #[fieldx(inner_mut, get(copy), set)]
208    next_day_customers: usize,
209
210    #[fieldx(lazy, get(clone))]
211    random_pool: Arc<RndPool>,
212
213    #[fieldx(builder(off))]
214    customer_model: Option<CustomerModel>,
215
216    #[fieldx(lazy, get)]
217    product_price_model: Arc<ProductModel>,
218
219    // --- Workers pool related
220    #[fieldx(lazy, clearer, private, builder(off), get)]
221    task_tx:       Sender<Task>,
222    #[fieldx(lazy, private, builder(off), get(copy))]
223    worker_count:  usize,
224    #[fieldx(lock, private, get, get_mut, builder(off))]
225    task_handlers: Vec<std::thread::JoinHandle<usize>>,
226
227    #[fieldx(lazy, private, get, builder(off))]
228    reporter: Reporter,
229
230    #[fieldx(lock, clearer, get(copy), set, builder(off))]
231    track_product: usize,
232
233    next_session_id: AtomicI64,
234
235    // Base number to form session IDs. It has the format of <current_day> * 10^<number_of_digits_in_market_capacity>.
236    #[fieldx(lazy, private, clearer, get(copy), builder(off))]
237    session_base: i64,
238
239    #[fieldx(lazy, inner_mut, get, get_mut, builder(off))]
240    customer_sessions: Vec<Option<DbSession>>,
241}
242
243impl ScriptWriter {
244    fn post_build(mut self) -> Self {
245        self.set_next_day_customers(self.initial_customers() as usize);
246
247        self.setup_customer_model();
248
249        // Do not let the model saturate too quickly. Limit the expected growth so that it reaches 75% of the market
250        // capacity at 4/5 of the total modeling period. This is a shortcoming of the Richards model:
251        // it is not as asymptotic as it should be. Ideally, it would never actually reach full market capacity.
252        self.customer_model()
253            .adjust_growth_rate(self.customer_model().market_capacity() * 0.75, self.period * 4 / 5);
254
255        self
256    }
257
258    pub fn create(&self) -> Result<Vec<Step>> {
259        self.direct()?;
260        self.reporter().stop()?;
261        self.clear_task_tx();
262        Ok(self.set_steps(Vec::new()))
263    }
264
265    pub fn direct(&self) -> Result<()> {
266        self.reporter().start()?;
267
268        self.add_step(Step::Title(ScriptTitle {
269            period:          self.period,
270            products:        self.product_count(),
271            market_capacity: self.market_capacity,
272        }))?;
273
274        self.init_products()?;
275        self.register_customers()?;
276        self.order_shipments(true)?;
277
278        for day in 1..=self.period {
279            self.set_current_day(day);
280            self.clear_session_base();
281            self.add_step(Step::Day(day))?;
282            let sessions_task = self.submit_task(TaskData::Sessions)?;
283            self.replenish_inventory()?;
284            self.fulfill_pending()?;
285            self.register_customers()?;
286            self.process_returns()?;
287            self.place_orders()?;
288
289            // The end of the day processing.
290            self.order_shipments(false)?;
291            sessions_task.wait()?;
292            self.add_step(Step::CollectSessions)?;
293            self.log_progress()?;
294        }
295
296        self.reporter().out("--- Simulation finished ---")?;
297        self.clear_task_tx();
298        while !self.task_handlers().is_empty() {
299            self.task_handlers_mut().pop().unwrap().join().unwrap();
300        }
301        self.reporter().out("--- All threads finished ---")?;
302
303        self.reporter().stop()?;
304
305        Ok(())
306    }
307
308    fn maybe_init_batch_steps(&self, batch_steps: &mut Vec<Step>) {
309        if batch_steps.capacity() == 0 {
310            batch_steps.reserve(
311                self.customer_model().market_capacity() as usize * self.max_customer_orders() as usize * 2
312                    / self.worker_count(),
313            );
314        }
315    }
316
317    fn submit_task(&self, data: TaskData) -> Result<TaskResult> {
318        let (tx, rx) = crossbeam::channel::unbounded();
319        let task = Task { data, tx };
320        let task_result = TaskResult { rx };
321        self.task_tx().send(task)?;
322        Ok(task_result)
323    }
324
325    fn task_purchases(&self, task: &PurchaseTask) -> Result<()> {
326        let mut rng = rand::rng();
327        let thread = thread::current();
328        let thread_id = thread.name().unwrap_or("unknown");
329        let return_probs = self.return_probs().clone();
330
331        BATCH_STEPS.with_borrow_mut(|batch_steps| -> Result<()> {
332            self.maybe_init_batch_steps(batch_steps);
333
334            let init_capacity = batch_steps.capacity();
335            let mut crecords = Vec::with_capacity(task.customers.len());
336            let mut expected_steps = 0;
337            let mut planned_returns: BTreeMap<i32, Vec<Order>> = BTreeMap::new();
338            let return_window = self.return_window() as f32;
339            let current_day = self.current_day();
340
341            for customer in task.customers.iter() {
342                let customer_purchases = customer.daily_purchases()?;
343                expected_steps += customer_purchases as usize;
344                crecords.push((customer, customer_purchases));
345
346                // If the customer is likely to purchase today, then we need to refresh their session.
347                // Otherwise, we simulate a scenario where the customer visits to check if they are interested.
348                // This is based on their expected daily purchases, so customers with an expectation of 1 or more
349                // are more likely to visit every day.
350                if customer_purchases > 0.0 || Bernoulli::new(1.0 - (-customer_purchases).exp()).unwrap().sample(&mut rng) {
351                    self.customer_login(customer.id())?;
352                }
353            }
354
355            let mut total_purchases = 0;
356            // let mut batch_steps = Vec::with_capacity(init_capacity);
357
358            for (customer, customer_purchases) in crecords {
359                let customer_id = customer.id();
360
361                // self.reporter()
362                //     .out(format!("Customer {}: {} purchases", customer.id(), customer_purchases))?;
363
364                if customer_purchases == 0.0 {
365                    // eprint!("  No purchases for customer {}", customer.id());
366                    continue;
367                }
368
369                let mut purchases = 0;
370
371                for (product_id, product_interest) in task.product_interests.iter() {
372
373                    // The customer may want to see the product page before ordering it.
374                    if rng.random_range(0.0..1.0) <= self.products()[*product_id as usize].view_probability() {
375                        batch_steps.push(Step::ViewProduct(*product_id));
376                    }
377
378                    let product_items = Poisson::new(customer_purchases * product_interest)
379                        .unwrap()
380                        .sample(&mut rng)
381                        .round() as i32;
382
383                    if product_items == 0 {
384                        // eprint!("  No purchases for customer {}", customer.id());
385                        continue;
386                    }
387
388                    let order = Order {
389                        id:          Uuid::new_v4(),
390                        customer_id,
391                        product_id:  *product_id,
392                        quantity:    product_items,
393                        status:      OrderStatus::New,
394                    };
395
396                    if rng.random_range(0.0..1.0) < *return_probs.get(product_id).unwrap() {
397                        let on_day =
398                            rng.random_range(0.0..return_window).round() as i32 + 1 + current_day;
399                        if on_day < self.period {
400                            planned_returns
401                                .entry(on_day)
402                                .or_default()
403                                .push(order.clone());
404                        }
405                    }
406
407                    self.adjust_capacity(batch_steps, 100);
408                    batch_steps.push(Step::AddOrder(order.into()));
409
410                    purchases += product_items;
411                }
412
413                total_purchases += purchases;
414            }
415
416            if batch_steps.capacity() > init_capacity {
417                self.reporter().out(&format!(
418                    "Thread {}: Purchase task steps capacity increased from {} to {} (expected steps: {}, total steps: {}, batch steps: {})",
419                    thread_id,
420                    init_capacity,
421                    batch_steps.capacity(),
422                    expected_steps,
423                    total_purchases,
424                    batch_steps.len()
425                ))?;
426            }
427
428            self.add_steps(batch_steps.drain(0..batch_steps.len()))?;
429
430            let mut returns = self.returns_mut();
431            for (day, orders) in planned_returns {
432                returns.entry(day).or_default().extend(orders);
433            }
434
435            Ok(())
436        })
437    }
438
439    fn task_returns(&self) -> Result<()> {
440        BATCH_STEPS.with_borrow_mut(|batch_steps| -> Result<()> {
441            self.maybe_init_batch_steps(batch_steps);
442            let mut returns = self.returns_mut();
443
444            // Process prepared returns first.
445            if let Some(today_returns) = returns.remove(&self.current_day()) {
446                // self.reporter().out(format!(
447                //     "Processing {} returns for day {}",
448                //     today_returns.len(),
449                //     self.current_day()
450                // ))?;
451
452                for mut order in today_returns {
453                    order.status = OrderStatus::Refunded;
454                    batch_steps.push(Step::UpdateOrder(order.into()));
455                }
456            }
457
458            self.add_steps(batch_steps.drain(0..batch_steps.len()))?;
459
460            Ok(())
461        })
462    }
463
464    fn task_pending(&self, indicies: &[usize]) -> Result<()> {
465        BATCH_STEPS.with_borrow_mut(|batch_steps| -> Result<()> {
466            self.maybe_init_batch_steps(batch_steps);
467
468            for step_idx in indicies.iter().copied() {
469                let mut order = self.order_by_idx(step_idx)?;
470
471                order.status = OrderStatus::Shipped;
472                batch_steps.push(Step::UpdateOrder(order));
473            }
474
475            self.add_steps(batch_steps.drain(0..batch_steps.len()))?;
476
477            Ok(())
478        })
479    }
480
481    fn next_session_id(&self) -> i64 {
482        self.next_session_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + self.session_base()
483    }
484
485    fn task_sessions(&self) -> Result<()> {
486        // Non-login sessions are those where a user visited the site but did not register or log in.  In such cases, a
487        // new session is always created; however, it is short-lived with an expiration set to the next day.  The
488        // expected number of temporary sessions depends on site popularity, which is measured by the number of existing
489        // customers.  Anticipated traffic is customer_count * 10, roughly equivalent to a 10% conversion rate from
490        // advertising—an unrealistically good value.
491        // Sessions are added one by one to intersperse them with other steps.
492
493        let day = self.current_day();
494        let anticipated_sessions = (self.customers().len() * SESSION_FACTOR) as f64;
495        let sigma = anticipated_sessions * 0.2;
496
497        let todays_sessions = Normal::new(anticipated_sessions, sigma)
498            .map_err(|err| simerr!("Normal distribution: {}", err))?
499            .sample(&mut rand::rng())
500            .round() as u32;
501
502        for _ in 0..todays_sessions {
503            self.add_step(Step::AddSession(DbSession {
504                id:          self.next_session_id(),
505                customer_id: None,
506                expires_on:  day + 3,
507            }))?;
508        }
509
510        Ok(())
511    }
512
513    fn task_worker(&self, thread_id: usize, rx: Receiver<Task>) -> Result<()> {
514        let thread_name = thread::current()
515            .name()
516            .ok_or(simerr!("Worker thread must have a name"))?
517            .to_owned();
518
519        loop {
520            let task = {
521                let Ok(task) = rx.recv()
522                else {
523                    break;
524                };
525                task
526            };
527
528            self.reporter().set_task_status(thread_id, TaskStatus::Busy);
529
530            task.done(
531                match task.data {
532                    TaskData::Purchase(ref p_task) => self.task_purchases(p_task),
533                    TaskData::Returns => self.task_returns(),
534                    TaskData::Pending(ref p_task) => self.task_pending(p_task),
535                    TaskData::Sessions => self.task_sessions(),
536                }
537                .inspect_err(|err| {
538                    err.context(format!("task worker {thread_name}"));
539                }),
540            )?;
541
542            self.reporter().set_task_status(thread_id, TaskStatus::Idle);
543        }
544
545        Ok(())
546    }
547
548    fn log_progress(&self) -> Result<()> {
549        let reporter = self.reporter();
550        reporter.set_scenario_lines(self.steps().len());
551        let total_backordered = self.total_backordered();
552        reporter.set_backorders(total_backordered);
553        let total_pending = self.pending_orders().values().map(|queue| queue.len()).sum();
554        reporter.set_pending_orders(total_pending);
555        if total_backordered > 100_000 {
556            // Categorize by product ID and sort by the amount.
557            let backorders = self.backorders();
558            let mut bo = backorders.iter().enumerate().collect::<Vec<_>>();
559            bo.sort_by(|a, b| b.1.len().cmp(&a.1.len()));
560            let product_id = bo[0].0;
561            self.set_track_product(product_id);
562            let product = &self.products()[product_id];
563            reporter.out(&format!(
564                "Top backordered product {}: {} orders:\n  Daily quotient: {:.2}, daily estimate: {:.2}, expected return rate: {:.2}\n  Stock supplies in: {:.2}, supplier inaccuracy: {:.2}, supplier tardiness: {:.2}",
565                product.id(),
566                bo[0].1.len(),
567                product.daily_quotient(),
568                product.daily_estimate(),
569                product.expected_return_rate(),
570                product.stock_supplies_in(),
571                product.supplier_inaccuracy(),
572                product.supplier_tardiness(),
573            ))?;
574        }
575        else {
576            self.clear_track_product();
577        }
578
579        let mut check_steps = vec![];
580        for inv_rec in self.inventory().iter() {
581            check_steps.push(Step::CheckInventory(InventoryCheck::new(
582                inv_rec.product_id(),
583                inv_rec.stock(),
584                "daily",
585            )));
586        }
587        self.add_steps(check_steps)?;
588
589        // if total_pending > 50_000 {
590        //     reporter.out(format!(
591        //         "Pending orders: {}",
592        //         self.pending_orders()
593        //             .iter()
594        //             .map(|(day, queue)| format!("{}: {}", day, queue.len()))
595        //             .collect::<Vec<_>>()
596        //             .join(", ")
597        //     ))?;
598        // }
599        self.reporter().refresh_report()
600    }
601
602    #[inline(always)]
603    fn adjust_capacity<T>(&self, steps: &mut Vec<T>, delta: usize) {
604        let step_count = steps.len();
605        if (step_count + delta) >= steps.capacity() {
606            // At least double the capacity.
607            steps.reserve(step_count);
608            // eprintln!("Step capacity increased from {} to {}", step_count, steps.capacity());
609        }
610    }
611
612    fn process_order(&self, mut step: Step) -> Result<(Step, AddPostProc)> {
613        let order = match step {
614            Step::AddOrder(ref mut order) | Step::UpdateOrder(ref mut order) => order,
615            _ => return Err(simerr!("Non-order step submitted for order processing: {:?}", step)),
616        };
617
618        let mut post_proc = AddPostProc::None;
619
620        if order.status == OrderStatus::New || order.status == OrderStatus::Recheck {
621            let new_order = order.status == OrderStatus::New;
622            let mut inventory = self.inventory_mut();
623            let Some(inv_rec) = inventory.get_mut(order.product_id as usize)
624            else {
625                return Err(simerr!("Product {} not found in inventory", order.product_id));
626            };
627
628            if inv_rec.stock() < order.quantity as i64 {
629                // Not enough stock, mark the order as backordered.
630                order.status = OrderStatus::Backordered;
631                if new_order {
632                    post_proc = AddPostProc::BackorderNew(order.product_id);
633                }
634                else {
635                    post_proc = AddPostProc::BackorderAgain(order.product_id);
636                }
637            }
638            else {
639                order.status = OrderStatus::Pending;
640                if inv_rec.handling_days() > 0 {
641                    // If this inventory entry requires extra processing then the order goes into the pending queue.
642                    post_proc = AddPostProc::Pending(self.current_day() + inv_rec.handling_days() as i32);
643                    // self.pending_orders_mut()
644                    //     .entry(self.current_day() + inv_rec.handling_days() as i32)
645                    //     .or_default()
646                    //     .push_back(order.clone());
647                }
648                else {
649                    // Same-day shipping: a pending step is still needed because the warehouse must process the request
650                    // before dispatch, so a post-processing step is added to proceed with the shipment.
651                    let mut shipping_order = order.clone();
652                    shipping_order.status = OrderStatus::Shipped;
653                    post_proc = AddPostProc::Shipping(shipping_order);
654                }
655                // Either way, the stock gets reduced.
656                *inv_rec.stock_mut() -= order.quantity as i64;
657            }
658        }
659
660        Ok((step, post_proc))
661    }
662
663    #[inline]
664    fn add_step(&self, step: Step) -> Result<()> {
665        self.add_steps([step])
666    }
667
668    fn add_steps<S: IntoIterator<Item = Step>>(&self, new_steps: S) -> Result<()> {
669        let mut steps = self.steps_mut();
670        self.adjust_capacity(&mut *steps, 10_000);
671
672        // List of backordered orders as a tuple of product ID and the position in the steps vector.
673        let mut backordered_new: Vec<(i32, usize)> = Vec::new();
674        let mut backordered_again: Vec<(i32, usize)> = Vec::new();
675        // List of pending orders as a tuple of a day to ship and the position in the steps vector.
676        let mut pending_orders: Vec<(i32, usize)> = Vec::new();
677
678        for mut step in new_steps.into_iter() {
679            let mut post_proc = AddPostProc::None;
680            match step {
681                Step::AddOrder(_) | Step::UpdateOrder(_) => {
682                    (step, post_proc) = self.process_order(step)?;
683                }
684                _ => (),
685            }
686
687            steps.push(step);
688            let step_idx = steps.len() - 1;
689
690            match post_proc {
691                AddPostProc::Pending(day) => {
692                    pending_orders.push((day, step_idx));
693                }
694                AddPostProc::BackorderNew(product_id) => {
695                    backordered_new.push((product_id, step_idx));
696                }
697                AddPostProc::BackorderAgain(product_id) => {
698                    backordered_again.push((product_id, step_idx));
699                }
700                AddPostProc::Shipping(shipping_order) => {
701                    // let inv_rec = inventory.get_mut(shipping_order.product_id as usize).unwrap();
702                    // steps.push(Step::CheckInventory(InventoryCheck::new(
703                    //     shipping_order.product_id,
704                    //     inv_rec.stock(),
705                    //     "same day shipping",
706                    // )));
707                    steps.push(Step::UpdateOrder(shipping_order));
708                }
709                AddPostProc::None => (),
710            }
711        }
712
713        if !pending_orders.is_empty() {
714            let mut pendings = self.pending_orders_mut();
715            for (day, step_idx) in pending_orders {
716                pendings.entry(day).or_default().push(step_idx);
717            }
718        }
719
720        if !(backordered_new.is_empty() && backordered_again.is_empty()) {
721            let mut backorders = self.backorders_mut();
722            for (product_id, step_idx) in backordered_again {
723                backorders[product_id as usize].push_front(step_idx);
724            }
725            for (product_id, step_idx) in backordered_new {
726                backorders[product_id as usize].push_back(step_idx);
727            }
728        }
729
730        self.reporter().set_scenario_lines(steps.len());
731        self.reporter().set_scenario_capacity(steps.capacity());
732
733        Ok(())
734    }
735
736    /// Register our products and initialize related structures: backorders and ivnentory.
737    fn init_products(&self) -> Result<()> {
738        let mut products = self.products_mut();
739        let random_pool = self.random_pool();
740        let price_model = self.product_price_model();
741        let product_count = self.product_count() as usize;
742        let geometric = Geometric::new(0.7).unwrap();
743        let mut rng = rand::rng();
744
745        products.reserve(product_count);
746        self.backorders_mut().resize(product_count, VecDeque::new());
747
748        for id in 0..self.product_count() {
749            let price = price_model.next_price();
750            let daily_quotient = random_pool.next_rand();
751            // Limit the return rate to 30% to avoid too much volatility
752            let expected_return_rate = random_pool.next_rand() * 0.3;
753            // Let's be optimistic and assume that expected shipment terms are between 3 and 21 days. 3 is OK, but 21 in
754            // the real world sometimes is an optimistic expectation. Though, with a 50% chance, it arrives later,
755            // making things more realistic.
756            let ship_terms = 3.0 + random_pool.next_rand() * 18.0;
757            // Make it random in the range of 5% to 20%.
758            let supplier_inaccuracy = random_pool.next_rand() * 0.15 + 0.05;
759            // Make it random in the range of -0.9 to 0.7.
760            let supplier_tardiness = random_pool.next_rand() * 1.6 - 0.9;
761            let product = Product::builder()
762                .id(id)
763                .name(format!("Product {id}"))
764                .price(price)
765                .daily_quotient(daily_quotient)
766                .expected_return_rate(expected_return_rate)
767                .stock_supplies_in(ship_terms)
768                .supplier_inaccuracy(supplier_inaccuracy)
769                .supplier_tardiness(supplier_tardiness)
770                .product_model(self.product_price_model().clone())
771                .build()
772                .unwrap();
773            let step = Step::AddProduct(product.clone().into());
774            self.add_step(step)?;
775
776            products.push(product);
777
778            // The distribution gives 1.. values, so we need to subtract 1 to get the range of 0...
779            let handling_days = geometric.sample(&mut rng).min(255) as i16;
780            let inv_rec = InventoryRecord::new(id, 0, handling_days);
781            self.inventory_mut().push(inv_rec.clone());
782            self.add_step(Step::AddInventoryRecord(inv_rec.into()))?;
783        }
784
785        Ok(())
786    }
787
788    fn register_customers(&self) -> Result<()> {
789        let new_customers = self.next_day_customers();
790
791        for _ in 0..new_customers {
792            let customer = self.random_pool().next_customer();
793            self.add_step(Step::AddCustomer(customer.clone().into()))?;
794            if self.customers().contains_key(customer.email()) {
795                panic!("Customer with email {} already exists", customer.email());
796            }
797            self.customers_mut()
798                .insert(customer.email().clone(), Arc::new(customer));
799        }
800
801        if self.current_day() < self.period {
802            let next_cust = self.customer_model().expected_customers(self.current_day() + 1).round() as usize
803                - self.customers().len();
804            self.set_next_day_customers(next_cust);
805
806            // Shift the customer counts history window.
807            let mut customer_counts = self.customer_counts_mut();
808            while customer_counts.len() >= self.customer_history_length() {
809                customer_counts.pop_front();
810            }
811            customer_counts.push_back(self.customers().len() as u32);
812        }
813
814        Ok(())
815    }
816
817    fn customer_login(&self, customer_id: i32) -> Result<()> {
818        let sess_idx = customer_id as usize - 1;
819        if let Some(existing_session) = &mut self.customer_sessions_mut()[sess_idx] {
820            if existing_session.expires_on >= self.current_day() {
821                // Session is still valid, no need to create a new one but extend the expiration date.
822                existing_session.expires_on = self.current_day() + LOGIN_EXPIRE_DAYS;
823                self.add_step(Step::UpdateSession(existing_session.clone()))?;
824                return Ok(());
825            }
826        }
827
828        // A new session.
829        let session = DbSession {
830            id:          self.next_session_id(),
831            customer_id: Some(customer_id),
832            expires_on:  self.current_day() + LOGIN_EXPIRE_DAYS,
833        };
834        self.customer_sessions_mut()[customer_id as usize - 1] = Some(session.clone());
835        self.add_step(Step::AddSession(session))?;
836        Ok(())
837    }
838
839    fn order_shipments(&self, initial: bool) -> Result<()> {
840        self.clear_total_backordered();
841
842        let customer_count = self.customers().len() as f64;
843        let products = self.products();
844        let inventory = self.inventory();
845        let mut shipments = self.shipments_mut();
846        let anticipated_growth = {
847            let base = self
848                .customer_counts()
849                .front()
850                .map(|rec| *rec as f64)
851                .unwrap_or(self.customer_model().initial_customers());
852            self.customers().len() as f64 / base
853        };
854
855        'inv_record: for product in products.iter() {
856            let product_id = product.id();
857
858            // A more realistic model would take into account the supplier's tardiness and inaccuracies, but we want to
859            // test stock outages as well. Still, we attempt to account for the expected customer base growth, albeit
860            // very simplistically, by multiplying the current number of customers by a constant factor.
861            let estimate_sales = self.total_backordered() as i32
862                + (product.daily_estimate() * customer_count * anticipated_growth * product.supplies_in()).round()
863                    as i32;
864            let being_shipped = *self.product_shipping_mut().entry(product_id).or_insert(0);
865
866            if let Some(tracked_product) = self.track_product() {
867                if tracked_product == product_id as usize {
868                    self.reporter().out(&format!(
869                        "Product {}: being shipped: {}, estimate sales: {}, inventory: {}",
870                        product_id,
871                        being_shipped,
872                        estimate_sales,
873                        inventory.get(tracked_product).map_or(0, |rec| rec.stock())
874                    ))?;
875                }
876            }
877
878            if being_shipped > estimate_sales || estimate_sales == 0 {
879                continue 'inv_record;
880            }
881
882            let mut batch_size = estimate_sales - being_shipped;
883
884            let arrives_on = if initial {
885                1
886            }
887            else {
888                if let Some(inv_rec) = inventory.get(product_id as usize) {
889                    // If we still have enough stock, we don't need to order anything.
890                    if inv_rec.stock() > batch_size as i64 {
891                        continue 'inv_record;
892                    }
893                    // Safe because we checked the stock above.
894                    batch_size -= inv_rec.stock() as i32;
895                }
896                // We ceil-round the value because even the small
897                // fraction of a day counts towards the entire day.
898                self.current_day() + product.supplies_in().ceil().to_i32().unwrap()
899            };
900
901            if batch_size > 0 {
902                let shipment = Shipment::builder()
903                    .product_id(product_id)
904                    .batch_size(batch_size)
905                    .arrives_on(arrives_on)
906                    .build()
907                    .unwrap();
908
909                if let Some(tracked_product) = self.track_product() {
910                    if tracked_product == product_id as usize {
911                        self.reporter().out(&format!(
912                            "Order shipment for product {}: {} units, arrives in {} days",
913                            product_id,
914                            shipment.batch_size(),
915                            shipment.arrives_on() - self.current_day()
916                        ))?;
917                    }
918                }
919
920                *self.product_shipping_mut().get_mut(&product_id).unwrap() += shipment.batch_size();
921                shipments.entry(arrives_on).or_default().push(shipment);
922            }
923        }
924
925        Ok(())
926    }
927
928    // Take all shipments for the current day, replenish the inventory and remove the shipments from the list.
929    fn replenish_inventory(&self) -> Result<()> {
930        let mut all_shipments = self.shipments_mut();
931        let products = self.products();
932        let mut inv_steps = Vec::new();
933
934        // self.report(
935        //     "I ".to_owned()
936        //         + &self
937        //             .inventory()
938        //             .iter()
939        //             .enumerate()
940        //             .map(|(id, rec)| format!("{}: {:>8}", id, rec.stock()))
941        //             .collect::<Vec<_>>()
942        //             .join("|"),
943        // )?;
944
945        // self.reporter().out(
946        //     "B ".to_owned()
947        //         + &self
948        //             .backorders()
949        //             .iter()
950        //             .enumerate()
951        //             .map(|(id, orders)| format!("{}: {:>8}", id, orders.len()))
952        //             .collect::<Vec<_>>()
953        //             .join("|"),
954        // )?;
955
956        if let Some(shipments) = all_shipments.remove(&self.current_day()) {
957            let mut inventory = self.inventory_mut();
958            let mut affected_products = vec![false; self.product_count() as usize];
959            for shipment in shipments {
960                let product = products.get(shipment.product_id() as usize).unwrap();
961                let product_id = product.id() as usize;
962                let batch_size = shipment.batch_size();
963                let inv_rec = inventory.get_mut(product_id).unwrap();
964
965                affected_products[product_id] = true;
966
967                // inv_steps.push(Step::CheckInventory(InventoryCheck::new(
968                //     shipment.product_id(),
969                //     inv_rec.stock(),
970                //     "pre-incoming shipment",
971                // )));
972
973                let new_stock = inv_rec.stock() + batch_size as i64;
974                *inv_rec.stock_mut() = new_stock;
975
976                if let Some(tracked_product) = self.track_product() {
977                    if tracked_product == product_id {
978                        self.reporter().out(&format!(
979                            "Arrived shipment for product {product_id}: {batch_size} units, new stock: {new_stock}",
980                        ))?;
981                    }
982                }
983
984                // The executor doesn't need to see the ordered shipment. We only inform it to replenish the inventory.
985                // It wouldn't also be involved with processing backorders – only execute order changes where it will
986                // actually decrease inventory stock.
987                inv_steps.push(Step::AddStock(IncomingShipment {
988                    product_id: shipment.product_id(),
989                    batch_size: shipment.batch_size(),
990                }));
991
992                // inv_steps.push(Step::CheckInventory(InventoryCheck::new(
993                //     shipment.product_id(),
994                //     new_stock,
995                //     "post-incoming shipment",
996                // )));
997
998                // self.report(format!(
999                //     "[day {}] Arrived shipment for product {}: {} units",
1000                //     self.current_day(),
1001                //     product_id,
1002                //     batch_size
1003                // ))?;
1004
1005                *self.product_shipping_mut().get_mut(&product.id()).unwrap() -= shipment.batch_size();
1006
1007                // // Put what's left of the arrived batch after fulfilling backorders into the inventory.
1008                // // Tell the executor to match the inventory with the expected value. This is useful for testing purposes
1009                // // to ensure that scenario is in sync with the simulation.
1010                // inv_steps.push(self.new_step(StepAction::CheckInventory {
1011                //     product_id: product_id as u32,
1012                //     stock:      new_stock,
1013                // }));
1014            }
1015
1016            for (product_id, updated) in affected_products.iter().enumerate() {
1017                if *updated {
1018                    let mut backorders = self.backorders_mut();
1019
1020                    if let Some(orders) = backorders.get_mut(product_id) {
1021                        // if orders.len() > 0 {
1022                        //     self.report(format!(
1023                        //         "Processing {} backorders for product {}",
1024                        //         orders.len(),
1025                        //         product_id
1026                        //     ))?;
1027                        // }
1028                        let inv_rec = &inventory[product_id];
1029
1030                        while !orders.is_empty() {
1031                            let mut order = self.order_by_idx(*orders.front().unwrap())?;
1032
1033                            if order.product_id != product_id as i32 {
1034                                return Err(simerr!(
1035                                    "Backorder {} product ID {} does not match the shipment's product ID {}",
1036                                    order.id,
1037                                    order.product_id,
1038                                    product_id
1039                                ));
1040                            }
1041
1042                            if order.quantity as i64 <= inv_rec.stock() {
1043                                // The order can be fulfilled, remove it from the backorder queue.
1044                                let _ = orders.pop_front().unwrap();
1045                                order.status = OrderStatus::Recheck;
1046                                // Submit backordered order for processing.
1047                                inv_steps.push(Step::UpdateOrder(order));
1048                            }
1049                            else {
1050                                // Not enough stock to fulfill the order, leave it in the backorder queue.
1051                                break;
1052                            }
1053                        }
1054                    }
1055                }
1056            }
1057        }
1058
1059        // self.report(
1060        //     "I ".to_owned()
1061        //         + &self
1062        //             .inventory()
1063        //             .iter()
1064        //             .enumerate()
1065        //             .map(|(id, rec)| format!("{}: {:>8}", id, rec.stock()))
1066        //             .collect::<Vec<_>>()
1067        //             .join("|"),
1068        // )?;
1069
1070        self.add_steps(inv_steps)
1071    }
1072
1073    fn fulfill_pending(&self) -> Result<()> {
1074        let mut pending_orders = self.pending_orders_mut();
1075
1076        let Some(indicies) = pending_orders.remove(&self.current_day())
1077        else {
1078            return Ok(());
1079        };
1080
1081        // Use batches of at least 1000 orders to minimize task submission overhead. However, do no more than
1082        // worker_count() tasks at once, as that ensures optimal CPU utilization.
1083        let order_count = indicies.len();
1084        let batch_count = (order_count / 1000 + 1).min(self.worker_count());
1085        let batch_size = order_count / batch_count;
1086
1087        let mut tasks = vec![];
1088
1089        for idx_chunk in indicies.chunks(batch_size) {
1090            tasks.push(self.submit_task(TaskData::Pending(idx_chunk.to_vec()))?);
1091        }
1092
1093        for task in tasks {
1094            task.wait()?;
1095        }
1096
1097        Ok(())
1098    }
1099
1100    fn place_orders(&self) -> Result<()> {
1101        let customers = self.customers();
1102        let products = self.products();
1103
1104        // First, calculate the current expected sale rate per product.
1105        let total_sale_rate: f64 = products.iter().map(|p| p.daily_estimate()).sum();
1106        let prod_sale_rate: Arc<Vec<(_, _)>> = Arc::new(
1107            products
1108                .iter()
1109                .map(|p| (p.id(), p.daily_estimate() / total_sale_rate))
1110                .collect(),
1111        );
1112
1113        let mut task_results = Vec::new();
1114
1115        let cust_count = customers.len();
1116        let task_count = self.worker_count();
1117        let batch_size = (cust_count / task_count) + (cust_count % task_count > 0) as usize;
1118
1119        for cust_chunk in customers.values().cloned().collect::<Vec<_>>().chunks(batch_size) {
1120            let cust_chunk = cust_chunk.to_vec();
1121
1122            let task = PurchaseTask {
1123                customers:         cust_chunk,
1124                product_interests: prod_sale_rate.clone(),
1125            };
1126
1127            task_results.push(self.submit_task(TaskData::Purchase(task))?);
1128        }
1129
1130        let mut succeed = true;
1131        for tr in task_results {
1132            if let Err(err) = tr.wait() {
1133                self.reporter().out(
1134                    &style(format!("Failed to process task: {err}"))
1135                        .red()
1136                        .bright()
1137                        .to_string(),
1138                )?;
1139                succeed = false;
1140            }
1141        }
1142
1143        if succeed {
1144            Ok(())
1145        }
1146        else {
1147            Err(simerr!("Order processing was not successful"))
1148        }
1149    }
1150
1151    fn process_returns(&self) -> Result<()> {
1152        let task_result = self.submit_task(TaskData::Returns)?;
1153
1154        task_result.wait()
1155    }
1156
1157    #[allow(unused)]
1158    fn report<S: Display>(&self, msg: S) -> Result<()> {
1159        self.reporter().out(&msg.to_string())
1160    }
1161
1162    fn order_by_idx(&self, idx: usize) -> Result<DbOrder> {
1163        let steps = self.steps();
1164        let Some(step) = steps.get(idx)
1165        else {
1166            return Err(simerr!("Step index {} not found", idx));
1167        };
1168        Ok(match step {
1169            Step::AddOrder(ref order) => order,
1170            Step::UpdateOrder(ref order) => order,
1171            _ => Err(simerr!("Script step at index {} is not an order", idx))?,
1172        }
1173        .clone())
1174    }
1175
1176    pub fn customer_model(&self) -> &CustomerModel {
1177        self.customer_model.as_ref().expect("Customer model not initialized")
1178    }
1179
1180    fn setup_customer_model(&mut self) {
1181        self.customer_model = Some(
1182            CustomerModel::builder()
1183                .initial_customers(self.initial_customers() as f64)
1184                .market_capacity(self.market_capacity() as f64)
1185                .inflection_point(self.inflection_point() as f64)
1186                .growth_rate(self.growth_rate())
1187                .build()
1188                .unwrap(),
1189        );
1190    }
1191
1192    fn build_worker_count(&self) -> usize {
1193        // Leave one core for the main thread, if possible.
1194        let threads = (num_cpus::get()).max(1);
1195        self.reporter().set_task_count(threads);
1196        threads
1197    }
1198
1199    fn build_task_tx(&self) -> Sender<Task> {
1200        // Leave one core for the main thread, if possible.
1201        let threads = self.worker_count();
1202        let (tx, rx) = crossbeam::channel::unbounded();
1203        for thread_id in 0..threads {
1204            let rx = rx.clone();
1205            let myself = self.myself().unwrap();
1206            let task_handler = thread::Builder::new()
1207                .name(format!("task_worker_{thread_id}"))
1208                .spawn(move || {
1209                    let mut retry = 1;
1210                    while let Err(err) = myself.task_worker(thread_id, rx.clone()) {
1211                        eprintln!("Task worker thread {thread_id} failed [{retry}]: {err:?}");
1212                        retry += 1;
1213                        if retry > 3 {
1214                            eprintln!("Task worker thread {thread_id} failed too many times, exiting");
1215                            break;
1216                        }
1217                    }
1218                    thread_id
1219                })
1220                .unwrap();
1221            self.task_handlers_mut().push(task_handler);
1222        }
1223
1224        tx
1225    }
1226
1227    fn build_reporter(&self) -> Reporter {
1228        if self.quiet() {
1229            Reporter::Quiet
1230        }
1231        else {
1232            Reporter::Formatted(child_build!(self, FormattedReporter).unwrap())
1233        }
1234    }
1235
1236    fn build_random_pool(&self) -> Arc<RndPool> {
1237        child_build!(
1238            self,
1239            RndPool {
1240                min_customer_orders: self.min_customer_orders,
1241                max_customer_orders: self.max_customer_orders,
1242                customers_full:      100,
1243            }
1244        )
1245        .unwrap()
1246    }
1247
1248    fn build_return_probs(&self) -> BTreeMap<i32, f64> {
1249        let mut return_probs = BTreeMap::new();
1250        let return_window = self.return_window() as f64;
1251        for product in self.products().iter() {
1252            return_probs.insert(product.id(), product.expected_return_rate() / return_window);
1253        }
1254        return_probs
1255    }
1256
1257    fn build_total_backordered(&self) -> usize {
1258        self.backorders().iter().map(|bo| bo.len()).sum()
1259    }
1260
1261    fn build_product_price_model(&self) -> Arc<ProductModel> {
1262        ProductModel::builder().build().unwrap()
1263    }
1264
1265    fn build_customer_sessions(&self) -> Vec<Option<DbSession>> {
1266        vec![None; self.market_capacity() as usize]
1267    }
1268
1269    fn build_session_base(&self) -> i64 {
1270        self.current_day() as i64 * 10i64.pow((self.market_capacity() as i64 * SESSION_FACTOR as i64).ilog10() + 1)
1271    }
1272}