wb_cache/test/simulation/scriptwriter/
rnd_pool.rs

1use std::thread;
2
3use crossbeam::queue::SegQueue;
4use crossbeam::sync::Parker;
5use crossbeam::sync::Unparker;
6use fake::Fake;
7use fieldx_plus::fx_plus;
8use rand::rngs::ThreadRng;
9use rand::Rng;
10use rand_distr::Distribution;
11use rand_distr::Gamma;
12use statrs::distribution::ContinuousCDF;
13use statrs::distribution::Gamma as StatrsGamma;
14
15use crate::test::simulation::types::simerr;
16use crate::test::simulation::types::SimErrorAny;
17
18use super::entity::customer::Customer;
19use super::math::bisect;
20use super::reporter::Reporter;
21use super::reporter::SwReporter;
22use super::ScriptWriter;
23
24#[fx_plus(
25    child(ScriptWriter, unwrap(or_else(SimErrorAny, no_scenario))),
26    rc,
27    sync,
28    default(off)
29)]
30pub struct RndPool {
31    #[fieldx(lock, get(copy), get_mut, default(500))]
32    randoms_min: usize,
33
34    #[fieldx(lock, get(copy), get_mut, default(1000))]
35    randoms_full: usize,
36
37    #[fieldx(get, get_mut, default(SegQueue::new()))]
38    randoms: SegQueue<f64>,
39
40    #[fieldx(lock, get(copy), get_mut, default(50))]
41    customers_min: usize,
42
43    #[fieldx(lock, get(copy), get_mut, set, default(100))]
44    customers_full: usize,
45
46    // Expected customer orders per day parameters.
47    #[fieldx(get(copy), default(0.15))]
48    min_customer_orders:    f64,
49    /// This value is not a hard upper limit but the boundary within which min_max_order_fraction of randomly generated
50    /// customer expected daily orders will fall. (1 - min_max_order_fraction) of all customers may get a higher value.
51    #[fieldx(get(copy), default(3.))]
52    max_customer_orders:    f64,
53    #[fieldx(get(copy), default(0.95))]
54    min_max_order_fraction: f64,
55    #[fieldx(copy, default(2.0))]
56    orders_gamma_shape:     f64,
57
58    // Erratic quotient parameters.
59    /// Similarly to max_customer_orders, this value is not a hard upper limit.
60    #[fieldx(get(copy), default(3.))]
61    max_erratic_quotient: f64,
62    /// Fraction of erratic quotient values to fall into 0..max_erratic_quotient range.
63    #[fieldx(get(copy), default(0.9))]
64    eq_fraction:          f64,
65    #[fieldx(get(copy), default(2.))]
66    eq_gamma_shape:       f64,
67
68    #[fieldx(copy, default(0.00001))]
69    tolerance: f64,
70
71    #[fieldx(lock, get, get_mut, default(SegQueue::new()))]
72    customers: SegQueue<Customer>,
73
74    #[fieldx(lock, get(copy), get_mut, default(0))]
75    customer_id: i32,
76
77    // #[fieldx(lazy, builder(off))]
78    // tx: Sender<bool>,
79    #[fieldx(lazy, get)]
80    generator_unparker: Unparker,
81
82    #[fieldx(lazy)]
83    orders_gamma: Gamma<f64>,
84
85    #[fieldx(lazy)]
86    erratic_quotient_gamma: Gamma<f64>,
87
88    #[fieldx(lazy)]
89    reporter: Reporter,
90
91    #[fieldx(lock, private, get(copy), set, builder(off))]
92    shutdown: bool,
93}
94
95macro_rules! next_method {
96    ( $( $method:ident, $name:ident, $type:ty );+ $(;)? ) => {
97        $( paste::paste! {
98            #[inline(always)]
99            pub fn [<next_ $method>](&self) -> $type {
100                if self.$name().len() < self.[<$name _min>]() {
101                    // Refill a half-empty buffer.
102                    self.generator_unparker().unpark();
103                    // self.tx().send(true).unwrap();
104                }
105                loop {
106                    if let Some(c) = self.$name().pop() {
107                        return c;
108                    }
109                    else {
110                        self.generator_unparker().unpark();
111                    }
112                }
113            }
114        } )+
115    };
116}
117
118impl RndPool {
119    next_method! {
120        rand, randoms, f64;
121        customer, customers, Customer;
122    }
123
124    // fn build_tx(&self) -> Sender<bool> {
125    //     let (tx, rx) = std::sync::mpsc::channel();
126    //     self.start(rx);
127    //     tx
128    // }
129
130    fn build_generator_unparker(&self) -> Unparker {
131        let parker = Parker::new();
132        let unparker = parker.unparker().clone();
133        self.start(parker);
134        unparker
135    }
136
137    fn solve_gamma_scale(&self, shape: f64, min_scale: f64, max_scale: f64, fraction: f64, upper_bound: f64) -> f64 {
138        bisect(min_scale, max_scale, fraction, self.tolerance, |scale| {
139            let gamma = StatrsGamma::new(shape, scale).unwrap();
140            let min = gamma.cdf(0.0);
141            let max = gamma.cdf(upper_bound);
142            max - min
143        })
144        .expect("failed to bisect gamma distribution scale")
145    }
146
147    fn build_orders_gamma(&self) -> Gamma<f64> {
148        Gamma::new(
149            self.orders_gamma_shape,
150            self.solve_gamma_scale(
151                self.orders_gamma_shape,
152                self.tolerance,
153                100.0,
154                self.min_max_order_fraction(),
155                self.max_customer_orders() - self.min_customer_orders(),
156            ),
157        )
158        .unwrap()
159    }
160
161    fn build_erratic_quotient_gamma(&self) -> Gamma<f64> {
162        Gamma::new(
163            self.eq_gamma_shape,
164            self.solve_gamma_scale(
165                self.eq_gamma_shape,
166                self.tolerance,
167                100.0,
168                self.eq_fraction(),
169                self.max_erratic_quotient() - 1.0,
170            ),
171        )
172        .unwrap()
173    }
174
175    fn build_reporter(&self) -> Reporter {
176        self.parent().unwrap().reporter().clone()
177    }
178
179    pub fn start(&self, parker: Parker) {
180        let me = self.myself().unwrap();
181        thread::Builder::new()
182            .name("rnd_pool".into())
183            .spawn(move || loop {
184                let mut rng = rand::rng();
185                while !me.shutdown() {
186                    me.replenish(&mut rng);
187                    parker.park();
188                }
189            })
190            .unwrap();
191    }
192
193    fn no_scenario(&self) -> SimErrorAny {
194        simerr!("Scenario object is gone!")
195    }
196
197    fn replenish(&self, rng: &mut ThreadRng) {
198        let mut incomplete = true;
199        self.reporter()
200            .set_rnd_pool_task_status(super::reporter::TaskStatus::Busy);
201
202        if self.randoms().is_empty() {
203            let _ = self.reporter().out(&format!(
204                "Doubling randoms pool size from {}/{}",
205                self.randoms_min(),
206                self.randoms_full()
207            ));
208            *self.randoms_full_mut() *= 2;
209            *self.randoms_min_mut() *= 2;
210        }
211
212        if self.customers().is_empty() {
213            let _ = self.reporter().out(&format!(
214                "Doubling customers pool size from {}/{}",
215                self.customers_min(),
216                self.customers_full()
217            ));
218            *self.customers_full_mut() *= 2;
219            *self.customers_min_mut() *= 2;
220        }
221
222        while incomplete {
223            incomplete = false;
224            if self.randoms().len() < self.randoms_full() {
225                self.randoms().push(rng.random_range(0.0..=1.0));
226                incomplete = true;
227            }
228
229            if self.customers().len() < self.customers_full() {
230                let customer = self.new_customer(rng);
231                self.customers().push(customer);
232                incomplete = true;
233            }
234        }
235        self.reporter()
236            .set_rnd_pool_task_status(super::reporter::TaskStatus::Idle);
237    }
238
239    fn new_customer(&self, rng: &mut ThreadRng) -> Customer {
240        *self.customer_id_mut() += 1;
241        let email = format!(
242            "{}{}@{}.{}",
243            fake::faker::lorem::en::Word().fake::<String>(),
244            self.customer_id(),
245            fake::faker::lorem::en::Word().fake::<String>(),
246            fake::faker::internet::en::DomainSuffix().fake::<String>(),
247        );
248        Customer::builder()
249            .id(self.customer_id())
250            .first_name(fake::faker::name::en::FirstName().fake())
251            .last_name(fake::faker::name::en::LastName().fake())
252            .email(email.clone())
253            .erratic_quotient(self.erratic_quotient_gamma().sample(rng))
254            .orders_per_day(self.orders_gamma().sample(rng) + self.min_customer_orders())
255            .build()
256            .unwrap()
257    }
258}
259
260impl Drop for RndPool {
261    fn drop(&mut self) {
262        self.set_shutdown(true);
263        self.generator_unparker().unpark();
264    }
265}