wb_cache/test/simulation/
company_plain.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use fieldx_plus::fx_plus;
7use indicatif::ProgressBar;
8use sea_orm::prelude::*;
9use sea_orm::ActiveValue::Set;
10use sea_orm::IntoActiveModel;
11
12use super::actor::TestActor;
13use super::db::cache::DBProvider;
14use super::db::driver::DatabaseDriver;
15use super::db::prelude::*;
16use super::progress::MaybeProgress;
17use super::progress::PStyle;
18use super::scriptwriter::steps::ScriptTitle;
19use super::types::simerr;
20use super::types::OrderStatus;
21use super::types::SimError;
22use super::types::SimErrorAny;
23use super::SimulationApp;
24
25const WATCH_PRODUCT_ID: Option<i32> = None; // Some(1);
26
27macro_rules! watch_product {
28    ($record:expr, $( $msg:tt )+ ) => {
29        if WATCH_PRODUCT_ID.is_some_and(|id| id < 0 || id == $record.product_id) {
30            eprintln!( $( $msg )+ );
31        }
32    };
33}
34
35#[derive(Debug)]
36#[fx_plus(
37    agent(APP, unwrap(or_else(SimErrorAny, <APP as SimulationApp>::app_is_gone()))),
38    parent,
39    fallible(off, error(SimErrorAny)),
40    default(off),
41    sync
42)]
43pub struct TestCompany<APP: SimulationApp, D: DatabaseDriver> {
44    #[fieldx(copy, get("_current_day"), inner_mut, set("_set_current_day", private), default(0))]
45    current_day: i32,
46
47    #[fieldx(lazy, get("_progress", private, clone), fallible)]
48    #[allow(clippy::type_complexity)]
49    progress: Arc<Option<ProgressBar>>,
50
51    #[fieldx(get(clone), builder(required))]
52    db: Arc<D>,
53
54    #[fieldx(inner_mut, get(copy), set, default(0))]
55    inv_check_no: u32,
56
57    #[fieldx(inner_mut, get, get_mut, default(HashMap::new()))]
58    updated_from: HashMap<Uuid, Order>,
59
60    #[fieldx(inner_mut, set, get(copy), builder(off), default(Instant::now()))]
61    started: Instant,
62}
63
64impl<APP: SimulationApp, D: DatabaseDriver> TestCompany<APP, D> {
65    fn build_progress(&self) -> Result<Arc<Option<ProgressBar>>, SimErrorAny> {
66        let app = self.app()?;
67        let progress = app.acquire_progress(PStyle::Main, None)?;
68
69        Ok(Arc::new(progress))
70    }
71
72    async fn update_inventory(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
73        match order.status {
74            OrderStatus::Backordered | OrderStatus::Refunded | OrderStatus::Shipped | OrderStatus::Recheck => (),
75            _ => {
76                watch_product!(
77                    order,
78                    "Updating inventory from order {}, status {:?}, product {}: {}",
79                    order.id,
80                    order.status,
81                    order.product_id,
82                    order.quantity
83                );
84                if self.updated_from().contains_key(&order.id) {
85                    return Err(simerr!(
86                        "Inventory already updated from order {:?}",
87                        self.updated_from().get(&order.id)
88                    )
89                    .into());
90                }
91                self.updated_from_mut().insert(order.id, order.clone());
92                self.update_inventory_record(db, order.product_id, -(order.quantity as i64))
93                    .await?;
94            }
95        }
96        Ok(())
97    }
98}
99
100#[async_trait]
101impl<APP, D> TestActor<APP> for TestCompany<APP, D>
102where
103    APP: SimulationApp,
104    D: DatabaseDriver,
105{
106    fn prelude(&self) -> Result<(), SimError> {
107        self.set_started(Instant::now());
108        self.progress()?.maybe_set_prefix("Plain ");
109        Ok(())
110    }
111
112    fn progress(&self) -> Result<Arc<Option<ProgressBar>>, SimError> {
113        Ok(self._progress()?)
114    }
115
116    async fn set_current_day(&self, day: i32) -> Result<(), SimError> {
117        self._set_current_day(day);
118        Ok(())
119    }
120
121    fn current_day(&self) -> i32 {
122        self._current_day()
123    }
124
125    fn set_title(&self, _title: &ScriptTitle) -> Result<(), SimError> {
126        Ok(())
127    }
128
129    async fn add_customer(&self, db: &DatabaseConnection, customer: &Customer) -> Result<(), SimError> {
130        let mut customer = customer.clone();
131        customer.registered_on = self.current_day();
132        Customers::insert(customer.into_active_model()).exec(db).await?;
133        Ok(())
134    }
135
136    async fn add_product(&self, db: &DatabaseConnection, product: &Product) -> Result<(), SimError> {
137        Products::insert(product.clone().into_active_model()).exec(db).await?;
138        Ok(())
139    }
140
141    async fn add_inventory_record(
142        &self,
143        db: &DatabaseConnection,
144        inventory_record: &InventoryRecord,
145    ) -> Result<(), SimError> {
146        InventoryRecords::insert(inventory_record.clone().into_active_model())
147            .exec(db)
148            .await?;
149        Ok(())
150    }
151
152    async fn add_order(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
153        watch_product!(
154            order,
155            "Adding order {} on product {}: {}, {:?}",
156            order.id,
157            order.product_id,
158            order.quantity,
159            order.status
160        );
161        self.update_inventory(db, order).await?;
162
163        let mut order = order.clone();
164        order.purchased_on = self.current_day();
165        order.into_active_model().insert(db).await?;
166
167        Ok(())
168    }
169
170    async fn update_order(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
171        watch_product!(
172            order,
173            "Updating order {} on product {}: {}, {:?}",
174            order.id,
175            order.product_id,
176            order.quantity,
177            order.status
178        );
179        self.update_inventory(db, order).await?;
180
181        let mut order_update = super::db::entity::order::ActiveModel::new();
182        order_update.id = Set(order.id);
183        order_update.status = Set(order.status);
184        order_update.update(db).await?;
185
186        Ok(())
187    }
188
189    async fn update_inventory_record(
190        &self,
191        db: &DatabaseConnection,
192        product_id: i32,
193        quantity: i64,
194    ) -> Result<(), SimError> {
195        let Some(inventory_record) = InventoryRecords::find_by_id(product_id).one(db).await?
196        else {
197            return Err(simerr!(
198                "Can't update non-existing inventory record for product ID: {}",
199                product_id
200            )
201            .into());
202        };
203
204        let new_stock = inventory_record.stock + quantity;
205        if new_stock < 0 {
206            return Err(simerr!(
207                "Not enough stock for product ID {}: need {}, but only {} remaining",
208                product_id,
209                -quantity,
210                inventory_record.stock
211            )
212            .into());
213        }
214        watch_product!(
215            inventory_record,
216            "Updating inventory record for product ID {}: {} -> {}",
217            product_id,
218            inventory_record.stock,
219            new_stock
220        );
221        let mut inventory_record = inventory_record.into_active_model();
222        inventory_record.product_id = Set(product_id);
223        inventory_record.stock = Set(new_stock);
224        inventory_record.update(db).await?;
225
226        Ok(())
227    }
228
229    async fn check_inventory(
230        &self,
231        db: &DatabaseConnection,
232        product_id: i32,
233        stock: i64,
234        comment: &str,
235    ) -> Result<(), SimError> {
236        let inventory_record = InventoryRecords::find_by_id(product_id).one(db).await?;
237
238        self.inv_rec_compare(&inventory_record, product_id, stock, comment)?;
239
240        let inv_check_no = self.inv_check_no() + 1;
241        self.set_inv_check_no(inv_check_no);
242
243        Ok(())
244    }
245
246    async fn add_session(&self, db: &DatabaseConnection, session: &Session) -> Result<(), SimError> {
247        let session = session.clone().into_active_model();
248        Sessions::insert(session).exec(db).await?;
249        Ok(())
250    }
251
252    async fn update_product_view_count(&self, db: &DatabaseConnection, product_id: i32) -> Result<(), SimError> {
253        Products::update_many()
254            .col_expr(
255                super::db::entity::product::Column::Views,
256                sea_orm::sea_query::SimpleExpr::Custom("views + 1".to_string()),
257            )
258            .filter(super::db::entity::product::Column::Id.eq(product_id))
259            .exec(db)
260            .await?;
261        Ok(())
262    }
263
264    async fn update_session(&self, db: &DatabaseConnection, session: &Session) -> Result<(), SimError> {
265        let mut session_update = super::db::entity::session::ActiveModel::new();
266        session_update.id = Set(session.id);
267        session_update.customer_id = Set(session.customer_id);
268        session_update.expires_on = Set(session.expires_on);
269        session_update.update(db).await?;
270
271        Ok(())
272    }
273
274    async fn collect_sessions(&self, db: &DatabaseConnection) -> Result<(), SimError> {
275        let res = Sessions::delete_many()
276            .filter(super::db::entity::session::Column::ExpiresOn.lte(self.current_day()))
277            .exec(db)
278            .await?;
279
280        if res.rows_affected == 0 {
281            self.progress()?.maybe_set_message("");
282        }
283        else {
284            self.progress()?
285                .maybe_set_message(format!("Collected {} sessions", res.rows_affected));
286        }
287
288        Ok(())
289    }
290
291    async fn step_complete(&self, _db: &DatabaseConnection, step_num: usize) -> Result<(), SimError> {
292        let elapsed = self.started().elapsed().as_secs_f64();
293        self.app()?.set_plain_per_sec(step_num as f64 / elapsed);
294        Ok(())
295    }
296}
297
298#[async_trait]
299impl<APP, D> DBProvider for TestCompany<APP, D>
300where
301    APP: SimulationApp,
302    D: DatabaseDriver,
303{
304    fn db_driver(&self) -> Result<Arc<impl DatabaseDriver>, SimErrorAny> {
305        Ok(self.db())
306    }
307
308    fn db_connection(&self) -> Result<DatabaseConnection, SimErrorAny> {
309        Ok(self.db().connection())
310    }
311}