wb_cache/test/simulation/
company_cached.rs

1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::fmt::Debug;
4use std::sync::Arc;
5use std::time::Duration;
6use std::time::Instant;
7
8use async_trait::async_trait;
9use fieldx_plus::child_build;
10use fieldx_plus::fx_plus;
11use indicatif::ProgressBar;
12use sea_orm::prelude::*;
13use sea_orm::ActiveValue;
14use sea_orm::DatabaseConnection;
15use sea_orm::EntityTrait;
16use sea_orm::QuerySelect;
17use tracing::debug;
18use tracing::instrument;
19
20use crate::cache;
21use crate::traits::Observer;
22use crate::update_iterator::UpdateIterator;
23use crate::Cache;
24
25use super::actor::TestActor;
26use super::db::cache::CacheUpdates;
27use super::db::cache::DBProvider;
28use super::db::driver::DatabaseDriver;
29use super::db::entity::customer::CustomerBy;
30use super::db::entity::session;
31use super::db::entity::*;
32use super::progress::MaybeProgress;
33use super::progress::PStyle;
34use super::scriptwriter::steps::ScriptTitle;
35use super::types::simerr;
36use super::types::OrderStatus;
37use super::types::SimError;
38use super::types::SimErrorAny;
39use super::SimulationApp;
40
41type CustomerCache<APP, D> = Arc<Cache<CustomerMgr<TestCompany<APP, D>>>>;
42type InvRecCache<APP, D> = Arc<Cache<InventoryRecordMgr<TestCompany<APP, D>>>>;
43type OrderCache<APP, D> = Arc<Cache<OrderMgr<TestCompany<APP, D>>>>;
44type ProductCache<APP, D> = Arc<Cache<ProductMgr<TestCompany<APP, D>>>>;
45type SessionCache<APP, D> = Arc<Cache<SessionMgr<TestCompany<APP, D>>>>;
46
47#[fx_plus(child(TestCompany<APP, D>, unwrap), sync)]
48struct OrderObserver<APP, D>
49where
50    APP: SimulationApp,
51    D: DatabaseDriver, {}
52
53#[async_trait]
54impl<APP, D> Observer<OrderMgr<TestCompany<APP, D>>> for OrderObserver<APP, D>
55where
56    APP: SimulationApp,
57    D: DatabaseDriver,
58{
59    async fn on_flush(&self, _updates: Arc<UpdateIterator<OrderMgr<TestCompany<APP, D>>>>) -> Result<(), SimErrorAny> {
60        let parent = self.parent();
61
62        // parent
63        //     .app()?
64        //     .report_debug(format!("OrderObserver::on_flush: {}", _updates.len()));
65        debug!("OrderObserver::on_flush: {}", _updates.len());
66
67        parent.customer_cache()?.flush_raw().await?;
68        Ok(())
69    }
70
71    async fn on_flush_one(
72        &self,
73        key: &Uuid,
74        update: &CacheUpdates<super::db::entity::order::ActiveModel>,
75    ) -> Result<(), Arc<SimErrorAny>> {
76        // self.parent()
77        //     .app()?
78        //     .report_debug(format!("OrderObserver::on_flush_one: {}", key));
79        debug!("OrderObserver::on_flush_one: {}", key);
80
81        match update {
82            CacheUpdates::Insert(am) | CacheUpdates::Update(am) => {
83                let company = self.parent();
84
85                match am.customer_id {
86                    ActiveValue::Set(_customer_id) | ActiveValue::Unchanged(_customer_id) => {
87                        company.customer_cache()?.flush().await?;
88                    }
89                    _ => (),
90                }
91                match am.product_id {
92                    ActiveValue::Set(_product_id) | ActiveValue::Unchanged(_product_id) => {
93                        company.product_cache()?.flush().await?;
94                    }
95                    _ => (),
96                }
97            }
98            CacheUpdates::Delete => (),
99        }
100        Ok(())
101    }
102
103    async fn on_monitor_error(&self, error: &Arc<SimErrorAny>) {
104        self.parent()
105            .app()
106            .unwrap()
107            .report_error(format!("OrderObserver::on_monitor_error: {error:?}"));
108    }
109
110    async fn on_debug(&self, message: &str) {
111        debug!("[orders] {}", message);
112        // self.parent()
113        //     .app()
114        //     .unwrap()
115        //     .report_debug(format!("[orders] {}", message));
116    }
117}
118
119#[derive(Debug)]
120#[fx_plus(child(TestCompany<APP,D>, unwrap), sync)]
121struct SessionObserver<APP, D>
122where
123    APP: SimulationApp,
124    D: DatabaseDriver, {}
125
126#[async_trait]
127impl<APP, D> Observer<SessionMgr<TestCompany<APP, D>>> for SessionObserver<APP, D>
128where
129    APP: SimulationApp,
130    D: DatabaseDriver,
131{
132    async fn on_flush(&self, updates: Arc<UpdateIterator<SessionMgr<TestCompany<APP, D>>>>) -> Result<(), SimErrorAny> {
133        // self.parent()
134        //     .app()?
135        //     .report_debug(format!("SessionObserver::on_flush: {}", updates.len()));
136        debug!("SessionObserver::on_flush: {}", updates.len());
137
138        let mut customer_ids: HashSet<CustomerBy> = HashSet::new();
139        while let Some(update) = updates.next() {
140            match update.update() {
141                CacheUpdates::Insert(am) | CacheUpdates::Update(am) => match am.customer_id {
142                    ActiveValue::Set(Some(customer_id)) | ActiveValue::Unchanged(Some(customer_id)) => {
143                        customer_ids.insert(CustomerBy::Id(customer_id));
144                    }
145                    _ => (),
146                },
147                CacheUpdates::Delete => (),
148            }
149        }
150        self.parent()
151            .customer_cache()?
152            .flush_many_raw(customer_ids.into_iter().collect::<Vec<_>>())
153            .await?;
154        Ok(())
155    }
156
157    async fn on_flush_one(
158        &self,
159        key: &i64,
160        update: &CacheUpdates<super::db::entity::session::ActiveModel>,
161    ) -> Result<(), Arc<SimErrorAny>> {
162        // self.parent()
163        //     .app()?
164        //     .report_debug(format!("SessionObserver::on_flush_one: {}", key));
165        debug!("SessionObserver::on_flush_one: {}", key);
166
167        match update {
168            CacheUpdates::Insert(am) | CacheUpdates::Update(am) => match am.customer_id {
169                ActiveValue::Set(Some(customer_id)) | ActiveValue::Unchanged(Some(customer_id)) => {
170                    self.parent()
171                        .customer_cache()?
172                        .flush_one(&CustomerBy::Id(customer_id))
173                        .await?;
174                }
175                _ => (),
176            },
177            CacheUpdates::Delete => (),
178        }
179        Ok(())
180    }
181
182    async fn on_monitor_error(&self, error: &Arc<SimErrorAny>) {
183        self.parent()
184            .app()
185            .unwrap()
186            .report_error(format!("SessionObserver::on_monitor_error: {error:?}"));
187    }
188
189    async fn on_debug(&self, message: &str) {
190        debug!("[sessions] {}", message);
191        // self.parent()
192        //     .app()
193        //     .unwrap()
194        //     .report_debug(format!("[sessions] {}", message));
195    }
196}
197
198#[fx_plus(
199    agent(APP, unwrap(or_else(SimErrorAny, <APP as SimulationApp>::app_is_gone()))),
200    parent,
201    fallible(off, error(SimErrorAny)),
202    sync,
203    default(off)
204)]
205#[allow(clippy::type_complexity)]
206pub struct TestCompany<APP: SimulationApp, D: DatabaseDriver> {
207    #[fieldx(copy, get("_current_day"), inner_mut, set("_set_current_day", private), default(0))]
208    current_day: i32,
209
210    #[fieldx(optional, private, inner_mut, get("_product_count", copy), set, builder(off))]
211    product_count: i32,
212
213    #[fieldx(optional, private, inner_mut, get("_market_capacity", copy), set, builder(off))]
214    market_capacity: u32,
215
216    #[fieldx(lazy, get("_progress", private, clone), fallible)]
217    progress: Arc<Option<ProgressBar>>,
218
219    #[fieldx(get(clone), builder(required))]
220    db: Arc<D>,
221
222    #[fieldx(inner_mut, get(copy), set, default(0))]
223    inv_check_no: u32,
224
225    #[fieldx(inner_mut, get, get_mut, default(HashMap::new()))]
226    updated_from: HashMap<Uuid, Order>,
227
228    #[fieldx(lazy, fallible, get(clone))]
229    customer_cache: CustomerCache<APP, D>,
230
231    #[fieldx(lazy, fallible, get(clone))]
232    inv_rec_cache: InvRecCache<APP, D>,
233
234    #[fieldx(lazy, fallible, get(clone))]
235    order_cache: OrderCache<APP, D>,
236
237    #[fieldx(lazy, fallible, get(clone))]
238    product_cache: ProductCache<APP, D>,
239
240    #[fieldx(lazy, fallible, get(clone))]
241    session_cache: SessionCache<APP, D>,
242
243    #[fieldx(inner_mut, set, get(copy), builder(off), default(Instant::now()))]
244    started: Instant,
245}
246
247impl<APP, D> Debug for TestCompany<APP, D>
248where
249    APP: SimulationApp,
250    D: DatabaseDriver,
251{
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        f.debug_struct("TestCompany")
254            .field("current_day", &self.current_day())
255            .field("product_count", &self.product_count())
256            .finish()
257    }
258}
259
260impl<APP: SimulationApp, D: DatabaseDriver> TestCompany<APP, D> {
261    fn build_progress(&self) -> Result<Arc<Option<ProgressBar>>, SimErrorAny> {
262        let app = self.app()?;
263        let progress = app.acquire_progress(PStyle::Main, None)?;
264
265        Ok(Arc::new(progress))
266    }
267
268    fn build_customer_cache(&self) -> Result<CustomerCache<APP, D>, SimErrorAny> {
269        let customer_cache = child_build!(self, CustomerMgr<TestCompany<APP, D>>)?;
270        Ok(Cache::builder()
271            .name("customers")
272            .data_controller(customer_cache)
273            .max_updates(self.market_capacity()? as u64)
274            .max_capacity(self.market_capacity()? as u64)
275            .flush_interval(Duration::from_secs(600))
276            .build()?)
277    }
278
279    fn build_inv_rec_cache(&self) -> Result<InvRecCache<APP, D>, SimErrorAny> {
280        let inv_rec_cache = child_build!(self, InventoryRecordMgr<TestCompany<APP, D>>)?;
281        Ok(Cache::builder()
282            .name("inventory records")
283            .data_controller(inv_rec_cache)
284            .max_updates(self.product_count()? as u64)
285            .max_capacity(self.product_count()? as u64)
286            .flush_interval(Duration::from_secs(600))
287            .build()?)
288    }
289
290    fn build_order_cache(&self) -> Result<OrderCache<APP, D>, SimErrorAny> {
291        let order_cache = child_build!(self, OrderMgr<TestCompany<APP, D>>)?;
292        let order_observer = child_build!(self, OrderObserver<APP,D>)?;
293        // Cache size is set based on the expectation that we may need to re-process one order per day per customer.
294        // Re-process means handling a refund or shipping a backordered one.
295        Ok(Cache::builder()
296            .name("orders")
297            .data_controller(order_cache)
298            .max_updates(self.market_capacity()? as u64 * 100)
299            .max_capacity((self.market_capacity()? as u64 * 1000).max(1_000_000))
300            .flush_interval(Duration::from_secs(600))
301            .observer(order_observer)
302            .build()?)
303    }
304
305    fn build_product_cache(&self) -> Result<ProductCache<APP, D>, SimErrorAny> {
306        let product_cache = child_build!(self, ProductMgr<TestCompany<APP, D>>)?;
307        Ok(Cache::builder()
308            .name("products")
309            .data_controller(product_cache)
310            .max_updates(self.product_count()? as u64)
311            .max_capacity(self.product_count()? as u64)
312            .flush_interval(Duration::from_secs(60))
313            .build()?)
314    }
315
316    fn build_session_cache(&self) -> Result<SessionCache<APP, D>, SimErrorAny> {
317        let session_cache = child_build!(self, SessionMgr<TestCompany<APP, D>>)?;
318        let session_observer = child_build!(self, SessionObserver<APP,D>)?;
319        Ok(Cache::builder()
320            .name("sessions")
321            .data_controller(session_cache)
322            .max_updates((self.market_capacity()? as u64 * 100).max(100_000))
323            .max_capacity((self.market_capacity()? as u64 * 1000).max(1_000_000))
324            .flush_interval(Duration::from_secs(600))
325            .observer(session_observer)
326            .build()?)
327    }
328
329    fn product_count(&self) -> Result<i32, SimErrorAny> {
330        self._product_count().ok_or_else(|| simerr!("Product count is not set"))
331    }
332
333    fn market_capacity(&self) -> Result<u32, SimErrorAny> {
334        self._market_capacity()
335            .ok_or_else(|| simerr!("Market capacity is not set"))
336    }
337
338    #[instrument(level = "trace", skip(db))]
339    async fn update_inventory(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
340        match order.status {
341            OrderStatus::Backordered | OrderStatus::Refunded | OrderStatus::Shipped | OrderStatus::Recheck => (),
342            _ => {
343                if self.updated_from().contains_key(&order.id) {
344                    // Prevent accidental double update
345                    Err(simerr!(
346                        "Inventory already updated from order {:?}",
347                        self.updated_from().get(&order.id)
348                    ))?;
349                }
350                self.updated_from_mut().insert(order.id, order.clone());
351                self.update_inventory_record(db, order.product_id, -(order.quantity as i64))
352                    .await?;
353            }
354        }
355        Ok(())
356    }
357}
358
359#[async_trait]
360impl<APP, D> TestActor<APP> for TestCompany<APP, D>
361where
362    APP: SimulationApp,
363    D: DatabaseDriver,
364{
365    fn prelude(&self) -> Result<(), SimError> {
366        self.set_started(Instant::now());
367        self.progress()?.maybe_set_prefix("Cached");
368        Ok(())
369    }
370
371    async fn set_current_day(&self, day: i32) -> Result<(), SimError> {
372        if day == 1 {
373            self.customer_cache()?.flush().await?;
374            self.product_cache()?.flush().await?;
375            self.inv_rec_cache()?.flush().await?;
376        }
377        self._set_current_day(day);
378        self.session_cache()?.soft_flush().await?;
379        self.order_cache()?.soft_flush().await?;
380        Ok(())
381    }
382
383    #[inline(always)]
384    fn current_day(&self) -> i32 {
385        self._current_day()
386    }
387
388    fn progress(&self) -> Result<Arc<Option<ProgressBar>>, SimError> {
389        Ok(self._progress()?)
390    }
391
392    fn set_title(&self, title: &ScriptTitle) -> Result<(), SimError> {
393        self.set_product_count(title.products);
394        self.set_market_capacity(title.market_capacity);
395        Ok(())
396    }
397
398    #[instrument(level = "trace", skip(self, _db))]
399    async fn add_customer(&self, _db: &DatabaseConnection, customer: &Customer) -> Result<(), SimError> {
400        let mut customer = customer.clone();
401        customer.registered_on = self.current_day();
402        self.customer_cache()?.insert(customer.clone()).await?;
403        Ok(())
404    }
405
406    #[instrument(level = "trace", skip(self, _db))]
407    async fn add_product(&self, _db: &DatabaseConnection, product: &Product) -> Result<(), SimError> {
408        self.product_cache()?.insert(product.clone()).await?;
409        Ok(())
410    }
411
412    #[instrument(level = "trace", skip(self, _db))]
413    async fn add_inventory_record(
414        &self,
415        _db: &DatabaseConnection,
416        inventory_record: &InventoryRecord,
417    ) -> Result<(), SimError> {
418        self.product_cache()?.flush().await?;
419        self.inv_rec_cache()?.insert(inventory_record.clone()).await?;
420        Ok(())
421    }
422
423    #[instrument(level = "trace", skip(self, db))]
424    async fn add_order(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
425        debug!(
426            "Adding order {} on product {}: {}, {:?}",
427            order.id, order.product_id, order.quantity, order.status
428        );
429        self.update_inventory(db, order).await?;
430
431        let mut order = order.clone();
432        order.purchased_on = self.current_day();
433        self.order_cache()?.insert(order.clone()).await?;
434
435        Ok(())
436    }
437
438    #[instrument(level = "trace", skip(self, _db))]
439    async fn add_session(&self, _db: &DatabaseConnection, session: &Session) -> Result<(), SimError> {
440        debug!("Adding session {} for customer {:?}", session.id, session.customer_id);
441        self.session_cache()?.insert(session.clone()).await?;
442        Ok(())
443    }
444
445    #[instrument(level = "trace", skip(self, _db))]
446    async fn check_inventory(
447        &self,
448        _db: &DatabaseConnection,
449        product_id: i32,
450        stock: i64,
451        comment: &str,
452    ) -> Result<(), SimError> {
453        let inventory_record = self.inv_rec_cache()?.get(&product_id).await?;
454
455        self.inv_rec_compare(&inventory_record, product_id, stock, comment)?;
456
457        let inv_check_no = self.inv_check_no() + 1;
458        self.set_inv_check_no(inv_check_no);
459
460        Ok(())
461    }
462
463    #[instrument(level = "trace", skip(self, _db))]
464    async fn update_inventory_record(
465        &self,
466        _db: &DatabaseConnection,
467        product_id: i32,
468        quantity: i64,
469    ) -> Result<(), SimError> {
470        self.inv_rec_cache()?
471            .entry(product_id)
472            .await?
473            .and_try_compute_with(|entry| async {
474                if let Some(entry) = entry {
475                    let mut inventory_record = entry.into_value();
476                    let new_stock = inventory_record.stock + quantity;
477                    if new_stock < 0 {
478                        return Err(simerr!(
479                            "Not enough stock for product ID {}: need {}, but only {} remaining",
480                            product_id,
481                            -quantity,
482                            inventory_record.stock
483                        ));
484                    }
485                    inventory_record.stock = new_stock;
486                    Ok(cache::Op::Put(inventory_record))
487                }
488                else {
489                    Err(simerr!(
490                        "Can't update non-existing inventory record for product ID: {}",
491                        product_id
492                    ))
493                }
494            })
495            .await?;
496
497        Ok(())
498    }
499
500    #[instrument(level = "trace", skip(self, db))]
501    async fn update_order(&self, db: &DatabaseConnection, order_update: &Order) -> Result<(), SimError> {
502        debug!(
503            "Updating order {} on product {}: {}, {:?}",
504            order_update.id, order_update.product_id, order_update.quantity, order_update.status
505        );
506        self.order_cache()?
507            .entry(order_update.id)
508            .await?
509            .and_try_compute_with(|entry| async {
510                if let Some(entry) = entry {
511                    self.update_inventory(db, order_update).await?;
512                    let mut order: Order = entry.into_value();
513                    order.status = order_update.status;
514                    Ok(cache::Op::Put(order))
515                }
516                else {
517                    Err(simerr!("Can't update non-existing order for ID: {}", order_update.id))
518                }
519            })
520            .await?;
521
522        Ok(())
523    }
524
525    #[instrument(level = "trace", skip(self, _db))]
526    async fn update_product_view_count(&self, _db: &DatabaseConnection, product_id: i32) -> Result<(), SimError> {
527        self.product_cache()?
528            .entry(product_id)
529            .await?
530            .and_try_compute_with(|entry| async {
531                if let Some(entry) = entry {
532                    let mut product = entry.into_value();
533                    product.views += 1;
534                    Ok(cache::Op::Put(product))
535                }
536                else {
537                    Err(simerr!("Can't update non-existing product for ID: {}", product_id))
538                }
539            })
540            .await?;
541
542        Ok(())
543    }
544
545    #[instrument(level = "trace", skip(self, _db))]
546    async fn update_session(&self, _db: &DatabaseConnection, session_update: &Session) -> Result<(), SimError> {
547        debug!(
548            "Updating session {} for customer {:?}",
549            session_update.id, session_update.customer_id
550        );
551        self.session_cache()?
552            .entry(session_update.id)
553            .await?
554            .and_try_compute_with(|entry| async {
555                if let Some(entry) = entry {
556                    let mut session = entry.into_value();
557                    session.customer_id = session_update.customer_id;
558                    session.expires_on = session_update.expires_on;
559                    Ok(cache::Op::Put(session))
560                }
561                else {
562                    Err(simerr!(
563                        "Can't update non-existing session for ID: {}",
564                        session_update.id
565                    ))
566                }
567            })
568            .await?;
569
570        Ok(())
571    }
572
573    #[instrument(level = "trace", skip(self, db))]
574    async fn collect_sessions(&self, db: &DatabaseConnection) -> Result<(), SimError> {
575        // It is safe to drop expired sessions that have no user ID set because they cannot become valid.  For sessions
576        // with a user ID, extra caution is required; do not directly delete those that are currently in the cache.
577        self.collect_session_stubs(db).await?;
578
579        let session_cache = self.session_cache()?;
580        let user_sessions = Sessions::find()
581            .select_only()
582            .column(session::Column::Id)
583            .filter(session::Column::CustomerId.is_not_null())
584            .into_tuple::<i64>()
585            .all(db)
586            .await?;
587
588        for session_id in user_sessions {
589            session_cache
590                .entry(session_id)
591                .await?
592                .and_try_compute_with(|entry| async {
593                    if let Some(entry) = entry {
594                        let session = entry.into_value();
595                        if session.expires_on <= self.current_day() {
596                            // Session expired
597                            Ok(cache::Op::Remove)
598                        }
599                        else {
600                            // Session is still valid, do nothing
601                            Ok(cache::Op::Nop)
602                        }
603                    }
604                    else {
605                        // If the session ID was found in the database but not in the cache, it indicates that it was
606                        // previously deleted but hadn't been flushed yet.
607                        // The scenario of a bug in the Cache implementation is not considered here.
608                        Ok(cache::Op::Nop)
609                    }
610                })
611                .await?;
612        }
613
614        Ok(())
615    }
616
617    async fn step_complete(&self, _db: &DatabaseConnection, step_num: usize) -> Result<(), SimError> {
618        let elapsed = self.started().elapsed().as_secs_f64();
619        self.app()?.set_cached_per_sec(step_num as f64 / elapsed);
620        Ok(())
621    }
622
623    #[instrument(level = "trace", skip(self))]
624    async fn curtain_call(&self) -> Result<(), SimError> {
625        self.app()?.report_info("Shutting down caches...");
626
627        self.customer_cache()?.close().await?;
628        self.product_cache()?.close().await?;
629        self.inv_rec_cache()?.close().await?;
630        self.order_cache()?.close().await?;
631        self.session_cache()?.close().await?;
632
633        Ok(())
634    }
635}
636
637impl<APP, D> DBProvider for TestCompany<APP, D>
638where
639    APP: SimulationApp,
640    D: DatabaseDriver,
641{
642    fn db_driver(&self) -> Result<Arc<impl DatabaseDriver>, SimErrorAny> {
643        Ok(self.db())
644    }
645
646    fn db_connection(&self) -> Result<DatabaseConnection, SimErrorAny> {
647        Ok(self.db().connection())
648    }
649}