wb_cache/test/simulation/scriptwriter/
rnd_pool.rs1use std::thread;
2
3use crossbeam::queue::SegQueue;
4use crossbeam::sync::Parker;
5use crossbeam::sync::Unparker;
6use fake::Fake;
7use fieldx_plus::fx_plus;
8use rand::rngs::ThreadRng;
9use rand::Rng;
10use rand_distr::Distribution;
11use rand_distr::Gamma;
12use statrs::distribution::ContinuousCDF;
13use statrs::distribution::Gamma as StatrsGamma;
14
15use crate::test::simulation::types::simerr;
16use crate::test::simulation::types::SimErrorAny;
17
18use super::entity::customer::Customer;
19use super::math::bisect;
20use super::reporter::Reporter;
21use super::reporter::SwReporter;
22use super::ScriptWriter;
23
24#[fx_plus(
25 child(ScriptWriter, unwrap(or_else(SimErrorAny, no_scenario))),
26 rc,
27 sync,
28 default(off)
29)]
30pub struct RndPool {
31 #[fieldx(lock, get(copy), get_mut, default(500))]
32 randoms_min: usize,
33
34 #[fieldx(lock, get(copy), get_mut, default(1000))]
35 randoms_full: usize,
36
37 #[fieldx(get, get_mut, default(SegQueue::new()))]
38 randoms: SegQueue<f64>,
39
40 #[fieldx(lock, get(copy), get_mut, default(50))]
41 customers_min: usize,
42
43 #[fieldx(lock, get(copy), get_mut, set, default(100))]
44 customers_full: usize,
45
46 #[fieldx(get(copy), default(0.15))]
48 min_customer_orders: f64,
49 #[fieldx(get(copy), default(3.))]
52 max_customer_orders: f64,
53 #[fieldx(get(copy), default(0.95))]
54 min_max_order_fraction: f64,
55 #[fieldx(copy, default(2.0))]
56 orders_gamma_shape: f64,
57
58 #[fieldx(get(copy), default(3.))]
61 max_erratic_quotient: f64,
62 #[fieldx(get(copy), default(0.9))]
64 eq_fraction: f64,
65 #[fieldx(get(copy), default(2.))]
66 eq_gamma_shape: f64,
67
68 #[fieldx(copy, default(0.00001))]
69 tolerance: f64,
70
71 #[fieldx(lock, get, get_mut, default(SegQueue::new()))]
72 customers: SegQueue<Customer>,
73
74 #[fieldx(lock, get(copy), get_mut, default(0))]
75 customer_id: i32,
76
77 #[fieldx(lazy, get)]
80 generator_unparker: Unparker,
81
82 #[fieldx(lazy)]
83 orders_gamma: Gamma<f64>,
84
85 #[fieldx(lazy)]
86 erratic_quotient_gamma: Gamma<f64>,
87
88 #[fieldx(lazy)]
89 reporter: Reporter,
90
91 #[fieldx(lock, private, get(copy), set, builder(off))]
92 shutdown: bool,
93}
94
95macro_rules! next_method {
96 ( $( $method:ident, $name:ident, $type:ty );+ $(;)? ) => {
97 $( paste::paste! {
98 #[inline(always)]
99 pub fn [<next_ $method>](&self) -> $type {
100 if self.$name().len() < self.[<$name _min>]() {
101 self.generator_unparker().unpark();
103 }
105 loop {
106 if let Some(c) = self.$name().pop() {
107 return c;
108 }
109 else {
110 self.generator_unparker().unpark();
111 }
112 }
113 }
114 } )+
115 };
116}
117
118impl RndPool {
119 next_method! {
120 rand, randoms, f64;
121 customer, customers, Customer;
122 }
123
124 fn build_generator_unparker(&self) -> Unparker {
131 let parker = Parker::new();
132 let unparker = parker.unparker().clone();
133 self.start(parker);
134 unparker
135 }
136
137 fn solve_gamma_scale(&self, shape: f64, min_scale: f64, max_scale: f64, fraction: f64, upper_bound: f64) -> f64 {
138 bisect(min_scale, max_scale, fraction, self.tolerance, |scale| {
139 let gamma = StatrsGamma::new(shape, scale).unwrap();
140 let min = gamma.cdf(0.0);
141 let max = gamma.cdf(upper_bound);
142 max - min
143 })
144 .expect("failed to bisect gamma distribution scale")
145 }
146
147 fn build_orders_gamma(&self) -> Gamma<f64> {
148 Gamma::new(
149 self.orders_gamma_shape,
150 self.solve_gamma_scale(
151 self.orders_gamma_shape,
152 self.tolerance,
153 100.0,
154 self.min_max_order_fraction(),
155 self.max_customer_orders() - self.min_customer_orders(),
156 ),
157 )
158 .unwrap()
159 }
160
161 fn build_erratic_quotient_gamma(&self) -> Gamma<f64> {
162 Gamma::new(
163 self.eq_gamma_shape,
164 self.solve_gamma_scale(
165 self.eq_gamma_shape,
166 self.tolerance,
167 100.0,
168 self.eq_fraction(),
169 self.max_erratic_quotient() - 1.0,
170 ),
171 )
172 .unwrap()
173 }
174
175 fn build_reporter(&self) -> Reporter {
176 self.parent().unwrap().reporter().clone()
177 }
178
179 pub fn start(&self, parker: Parker) {
180 let me = self.myself().unwrap();
181 thread::Builder::new()
182 .name("rnd_pool".into())
183 .spawn(move || loop {
184 let mut rng = rand::rng();
185 while !me.shutdown() {
186 me.replenish(&mut rng);
187 parker.park();
188 }
189 })
190 .unwrap();
191 }
192
193 fn no_scenario(&self) -> SimErrorAny {
194 simerr!("Scenario object is gone!")
195 }
196
197 fn replenish(&self, rng: &mut ThreadRng) {
198 let mut incomplete = true;
199 self.reporter()
200 .set_rnd_pool_task_status(super::reporter::TaskStatus::Busy);
201
202 if self.randoms().is_empty() {
203 let _ = self.reporter().out(&format!(
204 "Doubling randoms pool size from {}/{}",
205 self.randoms_min(),
206 self.randoms_full()
207 ));
208 *self.randoms_full_mut() *= 2;
209 *self.randoms_min_mut() *= 2;
210 }
211
212 if self.customers().is_empty() {
213 let _ = self.reporter().out(&format!(
214 "Doubling customers pool size from {}/{}",
215 self.customers_min(),
216 self.customers_full()
217 ));
218 *self.customers_full_mut() *= 2;
219 *self.customers_min_mut() *= 2;
220 }
221
222 while incomplete {
223 incomplete = false;
224 if self.randoms().len() < self.randoms_full() {
225 self.randoms().push(rng.random_range(0.0..=1.0));
226 incomplete = true;
227 }
228
229 if self.customers().len() < self.customers_full() {
230 let customer = self.new_customer(rng);
231 self.customers().push(customer);
232 incomplete = true;
233 }
234 }
235 self.reporter()
236 .set_rnd_pool_task_status(super::reporter::TaskStatus::Idle);
237 }
238
239 fn new_customer(&self, rng: &mut ThreadRng) -> Customer {
240 *self.customer_id_mut() += 1;
241 let email = format!(
242 "{}{}@{}.{}",
243 fake::faker::lorem::en::Word().fake::<String>(),
244 self.customer_id(),
245 fake::faker::lorem::en::Word().fake::<String>(),
246 fake::faker::internet::en::DomainSuffix().fake::<String>(),
247 );
248 Customer::builder()
249 .id(self.customer_id())
250 .first_name(fake::faker::name::en::FirstName().fake())
251 .last_name(fake::faker::name::en::LastName().fake())
252 .email(email.clone())
253 .erratic_quotient(self.erratic_quotient_gamma().sample(rng))
254 .orders_per_day(self.orders_gamma().sample(rng) + self.min_customer_orders())
255 .build()
256 .unwrap()
257 }
258}
259
260impl Drop for RndPool {
261 fn drop(&mut self) {
262 self.set_shutdown(true);
263 self.generator_unparker().unpark();
264 }
265}