wb_cache/test/simulation/
company_plain.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use fieldx_plus::fx_plus;
7use indicatif::ProgressBar;
8use sea_orm::prelude::*;
9use sea_orm::ActiveValue::Set;
10use sea_orm::IntoActiveModel;
11
12use super::actor::TestActor;
13use super::db::cache::DBProvider;
14use super::db::driver::DatabaseDriver;
15use super::db::prelude::*;
16use super::progress::MaybeProgress;
17use super::progress::PStyle;
18use super::scriptwriter::steps::ScriptTitle;
19use super::types::simerr;
20use super::types::OrderStatus;
21use super::types::SimError;
22use super::types::SimErrorAny;
23use super::SimulationApp;
24
25const WATCH_PRODUCT_ID: Option<i32> = None; macro_rules! watch_product {
28 ($record:expr, $( $msg:tt )+ ) => {
29 if WATCH_PRODUCT_ID.is_some_and(|id| id < 0 || id == $record.product_id) {
30 eprintln!( $( $msg )+ );
31 }
32 };
33}
34
35#[derive(Debug)]
36#[fx_plus(
37 agent(APP, unwrap(or_else(SimErrorAny, <APP as SimulationApp>::app_is_gone()))),
38 parent,
39 fallible(off, error(SimErrorAny)),
40 default(off),
41 sync
42)]
43pub struct TestCompany<APP: SimulationApp, D: DatabaseDriver> {
44 #[fieldx(copy, get("_current_day"), inner_mut, set("_set_current_day", private), default(0))]
45 current_day: i32,
46
47 #[fieldx(lazy, get("_progress", private, clone), fallible)]
48 #[allow(clippy::type_complexity)]
49 progress: Arc<Option<ProgressBar>>,
50
51 #[fieldx(get(clone), builder(required))]
52 db: Arc<D>,
53
54 #[fieldx(inner_mut, get(copy), set, default(0))]
55 inv_check_no: u32,
56
57 #[fieldx(inner_mut, get, get_mut, default(HashMap::new()))]
58 updated_from: HashMap<Uuid, Order>,
59
60 #[fieldx(inner_mut, set, get(copy), builder(off), default(Instant::now()))]
61 started: Instant,
62}
63
64impl<APP: SimulationApp, D: DatabaseDriver> TestCompany<APP, D> {
65 fn build_progress(&self) -> Result<Arc<Option<ProgressBar>>, SimErrorAny> {
66 let app = self.app()?;
67 let progress = app.acquire_progress(PStyle::Main, None)?;
68
69 Ok(Arc::new(progress))
70 }
71
72 async fn update_inventory(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
73 match order.status {
74 OrderStatus::Backordered | OrderStatus::Refunded | OrderStatus::Shipped | OrderStatus::Recheck => (),
75 _ => {
76 watch_product!(
77 order,
78 "Updating inventory from order {}, status {:?}, product {}: {}",
79 order.id,
80 order.status,
81 order.product_id,
82 order.quantity
83 );
84 if self.updated_from().contains_key(&order.id) {
85 return Err(simerr!(
86 "Inventory already updated from order {:?}",
87 self.updated_from().get(&order.id)
88 )
89 .into());
90 }
91 self.updated_from_mut().insert(order.id, order.clone());
92 self.update_inventory_record(db, order.product_id, -(order.quantity as i64))
93 .await?;
94 }
95 }
96 Ok(())
97 }
98}
99
100#[async_trait]
101impl<APP, D> TestActor<APP> for TestCompany<APP, D>
102where
103 APP: SimulationApp,
104 D: DatabaseDriver,
105{
106 fn prelude(&self) -> Result<(), SimError> {
107 self.set_started(Instant::now());
108 self.progress()?.maybe_set_prefix("Plain ");
109 Ok(())
110 }
111
112 fn progress(&self) -> Result<Arc<Option<ProgressBar>>, SimError> {
113 Ok(self._progress()?)
114 }
115
116 async fn set_current_day(&self, day: i32) -> Result<(), SimError> {
117 self._set_current_day(day);
118 Ok(())
119 }
120
121 fn current_day(&self) -> i32 {
122 self._current_day()
123 }
124
125 fn set_title(&self, _title: &ScriptTitle) -> Result<(), SimError> {
126 Ok(())
127 }
128
129 async fn add_customer(&self, db: &DatabaseConnection, customer: &Customer) -> Result<(), SimError> {
130 let mut customer = customer.clone();
131 customer.registered_on = self.current_day();
132 Customers::insert(customer.into_active_model()).exec(db).await?;
133 Ok(())
134 }
135
136 async fn add_product(&self, db: &DatabaseConnection, product: &Product) -> Result<(), SimError> {
137 Products::insert(product.clone().into_active_model()).exec(db).await?;
138 Ok(())
139 }
140
141 async fn add_inventory_record(
142 &self,
143 db: &DatabaseConnection,
144 inventory_record: &InventoryRecord,
145 ) -> Result<(), SimError> {
146 InventoryRecords::insert(inventory_record.clone().into_active_model())
147 .exec(db)
148 .await?;
149 Ok(())
150 }
151
152 async fn add_order(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
153 watch_product!(
154 order,
155 "Adding order {} on product {}: {}, {:?}",
156 order.id,
157 order.product_id,
158 order.quantity,
159 order.status
160 );
161 self.update_inventory(db, order).await?;
162
163 let mut order = order.clone();
164 order.purchased_on = self.current_day();
165 order.into_active_model().insert(db).await?;
166
167 Ok(())
168 }
169
170 async fn update_order(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
171 watch_product!(
172 order,
173 "Updating order {} on product {}: {}, {:?}",
174 order.id,
175 order.product_id,
176 order.quantity,
177 order.status
178 );
179 self.update_inventory(db, order).await?;
180
181 let mut order_update = super::db::entity::order::ActiveModel::new();
182 order_update.id = Set(order.id);
183 order_update.status = Set(order.status);
184 order_update.update(db).await?;
185
186 Ok(())
187 }
188
189 async fn update_inventory_record(
190 &self,
191 db: &DatabaseConnection,
192 product_id: i32,
193 quantity: i64,
194 ) -> Result<(), SimError> {
195 let Some(inventory_record) = InventoryRecords::find_by_id(product_id).one(db).await?
196 else {
197 return Err(simerr!(
198 "Can't update non-existing inventory record for product ID: {}",
199 product_id
200 )
201 .into());
202 };
203
204 let new_stock = inventory_record.stock + quantity;
205 if new_stock < 0 {
206 return Err(simerr!(
207 "Not enough stock for product ID {}: need {}, but only {} remaining",
208 product_id,
209 -quantity,
210 inventory_record.stock
211 )
212 .into());
213 }
214 watch_product!(
215 inventory_record,
216 "Updating inventory record for product ID {}: {} -> {}",
217 product_id,
218 inventory_record.stock,
219 new_stock
220 );
221 let mut inventory_record = inventory_record.into_active_model();
222 inventory_record.product_id = Set(product_id);
223 inventory_record.stock = Set(new_stock);
224 inventory_record.update(db).await?;
225
226 Ok(())
227 }
228
229 async fn check_inventory(
230 &self,
231 db: &DatabaseConnection,
232 product_id: i32,
233 stock: i64,
234 comment: &str,
235 ) -> Result<(), SimError> {
236 let inventory_record = InventoryRecords::find_by_id(product_id).one(db).await?;
237
238 self.inv_rec_compare(&inventory_record, product_id, stock, comment)?;
239
240 let inv_check_no = self.inv_check_no() + 1;
241 self.set_inv_check_no(inv_check_no);
242
243 Ok(())
244 }
245
246 async fn add_session(&self, db: &DatabaseConnection, session: &Session) -> Result<(), SimError> {
247 let session = session.clone().into_active_model();
248 Sessions::insert(session).exec(db).await?;
249 Ok(())
250 }
251
252 async fn update_product_view_count(&self, db: &DatabaseConnection, product_id: i32) -> Result<(), SimError> {
253 Products::update_many()
254 .col_expr(
255 super::db::entity::product::Column::Views,
256 sea_orm::sea_query::SimpleExpr::Custom("views + 1".to_string()),
257 )
258 .filter(super::db::entity::product::Column::Id.eq(product_id))
259 .exec(db)
260 .await?;
261 Ok(())
262 }
263
264 async fn update_session(&self, db: &DatabaseConnection, session: &Session) -> Result<(), SimError> {
265 let mut session_update = super::db::entity::session::ActiveModel::new();
266 session_update.id = Set(session.id);
267 session_update.customer_id = Set(session.customer_id);
268 session_update.expires_on = Set(session.expires_on);
269 session_update.update(db).await?;
270
271 Ok(())
272 }
273
274 async fn collect_sessions(&self, db: &DatabaseConnection) -> Result<(), SimError> {
275 let res = Sessions::delete_many()
276 .filter(super::db::entity::session::Column::ExpiresOn.lte(self.current_day()))
277 .exec(db)
278 .await?;
279
280 if res.rows_affected == 0 {
281 self.progress()?.maybe_set_message("");
282 }
283 else {
284 self.progress()?
285 .maybe_set_message(format!("Collected {} sessions", res.rows_affected));
286 }
287
288 Ok(())
289 }
290
291 async fn step_complete(&self, _db: &DatabaseConnection, step_num: usize) -> Result<(), SimError> {
292 let elapsed = self.started().elapsed().as_secs_f64();
293 self.app()?.set_plain_per_sec(step_num as f64 / elapsed);
294 Ok(())
295 }
296}
297
298#[async_trait]
299impl<APP, D> DBProvider for TestCompany<APP, D>
300where
301 APP: SimulationApp,
302 D: DatabaseDriver,
303{
304 fn db_driver(&self) -> Result<Arc<impl DatabaseDriver>, SimErrorAny> {
305 Ok(self.db())
306 }
307
308 fn db_connection(&self) -> Result<DatabaseConnection, SimErrorAny> {
309 Ok(self.db().connection())
310 }
311}