1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::fmt::Debug;
4use std::sync::Arc;
5use std::time::Duration;
6use std::time::Instant;
7
8use async_trait::async_trait;
9use fieldx_plus::child_build;
10use fieldx_plus::fx_plus;
11use indicatif::ProgressBar;
12use sea_orm::prelude::*;
13use sea_orm::ActiveValue;
14use sea_orm::DatabaseConnection;
15use sea_orm::EntityTrait;
16use sea_orm::QuerySelect;
17use tracing::debug;
18use tracing::instrument;
19
20use crate::cache;
21use crate::traits::Observer;
22use crate::update_iterator::UpdateIterator;
23use crate::Cache;
24
25use super::actor::TestActor;
26use super::db::cache::CacheUpdates;
27use super::db::cache::DBProvider;
28use super::db::driver::DatabaseDriver;
29use super::db::entity::customer::CustomerBy;
30use super::db::entity::session;
31use super::db::entity::*;
32use super::progress::MaybeProgress;
33use super::progress::PStyle;
34use super::scriptwriter::steps::ScriptTitle;
35use super::types::simerr;
36use super::types::OrderStatus;
37use super::types::SimError;
38use super::types::SimErrorAny;
39use super::SimulationApp;
40
41type CustomerCache<APP, D> = Arc<Cache<CustomerMgr<TestCompany<APP, D>>>>;
42type InvRecCache<APP, D> = Arc<Cache<InventoryRecordMgr<TestCompany<APP, D>>>>;
43type OrderCache<APP, D> = Arc<Cache<OrderMgr<TestCompany<APP, D>>>>;
44type ProductCache<APP, D> = Arc<Cache<ProductMgr<TestCompany<APP, D>>>>;
45type SessionCache<APP, D> = Arc<Cache<SessionMgr<TestCompany<APP, D>>>>;
46
47#[fx_plus(child(TestCompany<APP, D>, unwrap), sync)]
48struct OrderObserver<APP, D>
49where
50 APP: SimulationApp,
51 D: DatabaseDriver, {}
52
53#[async_trait]
54impl<APP, D> Observer<OrderMgr<TestCompany<APP, D>>> for OrderObserver<APP, D>
55where
56 APP: SimulationApp,
57 D: DatabaseDriver,
58{
59 async fn on_flush(&self, _updates: Arc<UpdateIterator<OrderMgr<TestCompany<APP, D>>>>) -> Result<(), SimErrorAny> {
60 let parent = self.parent();
61
62 debug!("OrderObserver::on_flush: {}", _updates.len());
66
67 parent.customer_cache()?.flush_raw().await?;
68 Ok(())
69 }
70
71 async fn on_flush_one(
72 &self,
73 key: &Uuid,
74 update: &CacheUpdates<super::db::entity::order::ActiveModel>,
75 ) -> Result<(), Arc<SimErrorAny>> {
76 debug!("OrderObserver::on_flush_one: {}", key);
80
81 match update {
82 CacheUpdates::Insert(am) | CacheUpdates::Update(am) => {
83 let company = self.parent();
84
85 match am.customer_id {
86 ActiveValue::Set(_customer_id) | ActiveValue::Unchanged(_customer_id) => {
87 company.customer_cache()?.flush().await?;
88 }
89 _ => (),
90 }
91 match am.product_id {
92 ActiveValue::Set(_product_id) | ActiveValue::Unchanged(_product_id) => {
93 company.product_cache()?.flush().await?;
94 }
95 _ => (),
96 }
97 }
98 CacheUpdates::Delete => (),
99 }
100 Ok(())
101 }
102
103 async fn on_monitor_error(&self, error: &Arc<SimErrorAny>) {
104 self.parent()
105 .app()
106 .unwrap()
107 .report_error(format!("OrderObserver::on_monitor_error: {error:?}"));
108 }
109
110 async fn on_debug(&self, message: &str) {
111 debug!("[orders] {}", message);
112 }
117}
118
119#[derive(Debug)]
120#[fx_plus(child(TestCompany<APP,D>, unwrap), sync)]
121struct SessionObserver<APP, D>
122where
123 APP: SimulationApp,
124 D: DatabaseDriver, {}
125
126#[async_trait]
127impl<APP, D> Observer<SessionMgr<TestCompany<APP, D>>> for SessionObserver<APP, D>
128where
129 APP: SimulationApp,
130 D: DatabaseDriver,
131{
132 async fn on_flush(&self, updates: Arc<UpdateIterator<SessionMgr<TestCompany<APP, D>>>>) -> Result<(), SimErrorAny> {
133 debug!("SessionObserver::on_flush: {}", updates.len());
137
138 let mut customer_ids: HashSet<CustomerBy> = HashSet::new();
139 while let Some(update) = updates.next() {
140 match update.update() {
141 CacheUpdates::Insert(am) | CacheUpdates::Update(am) => match am.customer_id {
142 ActiveValue::Set(Some(customer_id)) | ActiveValue::Unchanged(Some(customer_id)) => {
143 customer_ids.insert(CustomerBy::Id(customer_id));
144 }
145 _ => (),
146 },
147 CacheUpdates::Delete => (),
148 }
149 }
150 self.parent()
151 .customer_cache()?
152 .flush_many_raw(customer_ids.into_iter().collect::<Vec<_>>())
153 .await?;
154 Ok(())
155 }
156
157 async fn on_flush_one(
158 &self,
159 key: &i64,
160 update: &CacheUpdates<super::db::entity::session::ActiveModel>,
161 ) -> Result<(), Arc<SimErrorAny>> {
162 debug!("SessionObserver::on_flush_one: {}", key);
166
167 match update {
168 CacheUpdates::Insert(am) | CacheUpdates::Update(am) => match am.customer_id {
169 ActiveValue::Set(Some(customer_id)) | ActiveValue::Unchanged(Some(customer_id)) => {
170 self.parent()
171 .customer_cache()?
172 .flush_one(&CustomerBy::Id(customer_id))
173 .await?;
174 }
175 _ => (),
176 },
177 CacheUpdates::Delete => (),
178 }
179 Ok(())
180 }
181
182 async fn on_monitor_error(&self, error: &Arc<SimErrorAny>) {
183 self.parent()
184 .app()
185 .unwrap()
186 .report_error(format!("SessionObserver::on_monitor_error: {error:?}"));
187 }
188
189 async fn on_debug(&self, message: &str) {
190 debug!("[sessions] {}", message);
191 }
196}
197
198#[fx_plus(
199 agent(APP, unwrap(or_else(SimErrorAny, <APP as SimulationApp>::app_is_gone()))),
200 parent,
201 fallible(off, error(SimErrorAny)),
202 sync,
203 default(off)
204)]
205#[allow(clippy::type_complexity)]
206pub struct TestCompany<APP: SimulationApp, D: DatabaseDriver> {
207 #[fieldx(copy, get("_current_day"), inner_mut, set("_set_current_day", private), default(0))]
208 current_day: i32,
209
210 #[fieldx(optional, private, inner_mut, get("_product_count", copy), set, builder(off))]
211 product_count: i32,
212
213 #[fieldx(optional, private, inner_mut, get("_market_capacity", copy), set, builder(off))]
214 market_capacity: u32,
215
216 #[fieldx(lazy, get("_progress", private, clone), fallible)]
217 progress: Arc<Option<ProgressBar>>,
218
219 #[fieldx(get(clone), builder(required))]
220 db: Arc<D>,
221
222 #[fieldx(inner_mut, get(copy), set, default(0))]
223 inv_check_no: u32,
224
225 #[fieldx(inner_mut, get, get_mut, default(HashMap::new()))]
226 updated_from: HashMap<Uuid, Order>,
227
228 #[fieldx(lazy, fallible, get(clone))]
229 customer_cache: CustomerCache<APP, D>,
230
231 #[fieldx(lazy, fallible, get(clone))]
232 inv_rec_cache: InvRecCache<APP, D>,
233
234 #[fieldx(lazy, fallible, get(clone))]
235 order_cache: OrderCache<APP, D>,
236
237 #[fieldx(lazy, fallible, get(clone))]
238 product_cache: ProductCache<APP, D>,
239
240 #[fieldx(lazy, fallible, get(clone))]
241 session_cache: SessionCache<APP, D>,
242
243 #[fieldx(inner_mut, set, get(copy), builder(off), default(Instant::now()))]
244 started: Instant,
245}
246
247impl<APP, D> Debug for TestCompany<APP, D>
248where
249 APP: SimulationApp,
250 D: DatabaseDriver,
251{
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 f.debug_struct("TestCompany")
254 .field("current_day", &self.current_day())
255 .field("product_count", &self.product_count())
256 .finish()
257 }
258}
259
260impl<APP: SimulationApp, D: DatabaseDriver> TestCompany<APP, D> {
261 fn build_progress(&self) -> Result<Arc<Option<ProgressBar>>, SimErrorAny> {
262 let app = self.app()?;
263 let progress = app.acquire_progress(PStyle::Main, None)?;
264
265 Ok(Arc::new(progress))
266 }
267
268 fn build_customer_cache(&self) -> Result<CustomerCache<APP, D>, SimErrorAny> {
269 let customer_cache = child_build!(self, CustomerMgr<TestCompany<APP, D>>)?;
270 Ok(Cache::builder()
271 .name("customers")
272 .data_controller(customer_cache)
273 .max_updates(self.market_capacity()? as u64)
274 .max_capacity(self.market_capacity()? as u64)
275 .flush_interval(Duration::from_secs(600))
276 .build()?)
277 }
278
279 fn build_inv_rec_cache(&self) -> Result<InvRecCache<APP, D>, SimErrorAny> {
280 let inv_rec_cache = child_build!(self, InventoryRecordMgr<TestCompany<APP, D>>)?;
281 Ok(Cache::builder()
282 .name("inventory records")
283 .data_controller(inv_rec_cache)
284 .max_updates(self.product_count()? as u64)
285 .max_capacity(self.product_count()? as u64)
286 .flush_interval(Duration::from_secs(600))
287 .build()?)
288 }
289
290 fn build_order_cache(&self) -> Result<OrderCache<APP, D>, SimErrorAny> {
291 let order_cache = child_build!(self, OrderMgr<TestCompany<APP, D>>)?;
292 let order_observer = child_build!(self, OrderObserver<APP,D>)?;
293 Ok(Cache::builder()
296 .name("orders")
297 .data_controller(order_cache)
298 .max_updates(self.market_capacity()? as u64 * 100)
299 .max_capacity((self.market_capacity()? as u64 * 1000).max(1_000_000))
300 .flush_interval(Duration::from_secs(600))
301 .observer(order_observer)
302 .build()?)
303 }
304
305 fn build_product_cache(&self) -> Result<ProductCache<APP, D>, SimErrorAny> {
306 let product_cache = child_build!(self, ProductMgr<TestCompany<APP, D>>)?;
307 Ok(Cache::builder()
308 .name("products")
309 .data_controller(product_cache)
310 .max_updates(self.product_count()? as u64)
311 .max_capacity(self.product_count()? as u64)
312 .flush_interval(Duration::from_secs(60))
313 .build()?)
314 }
315
316 fn build_session_cache(&self) -> Result<SessionCache<APP, D>, SimErrorAny> {
317 let session_cache = child_build!(self, SessionMgr<TestCompany<APP, D>>)?;
318 let session_observer = child_build!(self, SessionObserver<APP,D>)?;
319 Ok(Cache::builder()
320 .name("sessions")
321 .data_controller(session_cache)
322 .max_updates((self.market_capacity()? as u64 * 100).max(100_000))
323 .max_capacity((self.market_capacity()? as u64 * 1000).max(1_000_000))
324 .flush_interval(Duration::from_secs(600))
325 .observer(session_observer)
326 .build()?)
327 }
328
329 fn product_count(&self) -> Result<i32, SimErrorAny> {
330 self._product_count().ok_or_else(|| simerr!("Product count is not set"))
331 }
332
333 fn market_capacity(&self) -> Result<u32, SimErrorAny> {
334 self._market_capacity()
335 .ok_or_else(|| simerr!("Market capacity is not set"))
336 }
337
338 #[instrument(level = "trace", skip(db))]
339 async fn update_inventory(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
340 match order.status {
341 OrderStatus::Backordered | OrderStatus::Refunded | OrderStatus::Shipped | OrderStatus::Recheck => (),
342 _ => {
343 if self.updated_from().contains_key(&order.id) {
344 Err(simerr!(
346 "Inventory already updated from order {:?}",
347 self.updated_from().get(&order.id)
348 ))?;
349 }
350 self.updated_from_mut().insert(order.id, order.clone());
351 self.update_inventory_record(db, order.product_id, -(order.quantity as i64))
352 .await?;
353 }
354 }
355 Ok(())
356 }
357}
358
359#[async_trait]
360impl<APP, D> TestActor<APP> for TestCompany<APP, D>
361where
362 APP: SimulationApp,
363 D: DatabaseDriver,
364{
365 fn prelude(&self) -> Result<(), SimError> {
366 self.set_started(Instant::now());
367 self.progress()?.maybe_set_prefix("Cached");
368 Ok(())
369 }
370
371 async fn set_current_day(&self, day: i32) -> Result<(), SimError> {
372 if day == 1 {
373 self.customer_cache()?.flush().await?;
374 self.product_cache()?.flush().await?;
375 self.inv_rec_cache()?.flush().await?;
376 }
377 self._set_current_day(day);
378 self.session_cache()?.soft_flush().await?;
379 self.order_cache()?.soft_flush().await?;
380 Ok(())
381 }
382
383 #[inline(always)]
384 fn current_day(&self) -> i32 {
385 self._current_day()
386 }
387
388 fn progress(&self) -> Result<Arc<Option<ProgressBar>>, SimError> {
389 Ok(self._progress()?)
390 }
391
392 fn set_title(&self, title: &ScriptTitle) -> Result<(), SimError> {
393 self.set_product_count(title.products);
394 self.set_market_capacity(title.market_capacity);
395 Ok(())
396 }
397
398 #[instrument(level = "trace", skip(self, _db))]
399 async fn add_customer(&self, _db: &DatabaseConnection, customer: &Customer) -> Result<(), SimError> {
400 let mut customer = customer.clone();
401 customer.registered_on = self.current_day();
402 self.customer_cache()?.insert(customer.clone()).await?;
403 Ok(())
404 }
405
406 #[instrument(level = "trace", skip(self, _db))]
407 async fn add_product(&self, _db: &DatabaseConnection, product: &Product) -> Result<(), SimError> {
408 self.product_cache()?.insert(product.clone()).await?;
409 Ok(())
410 }
411
412 #[instrument(level = "trace", skip(self, _db))]
413 async fn add_inventory_record(
414 &self,
415 _db: &DatabaseConnection,
416 inventory_record: &InventoryRecord,
417 ) -> Result<(), SimError> {
418 self.product_cache()?.flush().await?;
419 self.inv_rec_cache()?.insert(inventory_record.clone()).await?;
420 Ok(())
421 }
422
423 #[instrument(level = "trace", skip(self, db))]
424 async fn add_order(&self, db: &DatabaseConnection, order: &Order) -> Result<(), SimError> {
425 debug!(
426 "Adding order {} on product {}: {}, {:?}",
427 order.id, order.product_id, order.quantity, order.status
428 );
429 self.update_inventory(db, order).await?;
430
431 let mut order = order.clone();
432 order.purchased_on = self.current_day();
433 self.order_cache()?.insert(order.clone()).await?;
434
435 Ok(())
436 }
437
438 #[instrument(level = "trace", skip(self, _db))]
439 async fn add_session(&self, _db: &DatabaseConnection, session: &Session) -> Result<(), SimError> {
440 debug!("Adding session {} for customer {:?}", session.id, session.customer_id);
441 self.session_cache()?.insert(session.clone()).await?;
442 Ok(())
443 }
444
445 #[instrument(level = "trace", skip(self, _db))]
446 async fn check_inventory(
447 &self,
448 _db: &DatabaseConnection,
449 product_id: i32,
450 stock: i64,
451 comment: &str,
452 ) -> Result<(), SimError> {
453 let inventory_record = self.inv_rec_cache()?.get(&product_id).await?;
454
455 self.inv_rec_compare(&inventory_record, product_id, stock, comment)?;
456
457 let inv_check_no = self.inv_check_no() + 1;
458 self.set_inv_check_no(inv_check_no);
459
460 Ok(())
461 }
462
463 #[instrument(level = "trace", skip(self, _db))]
464 async fn update_inventory_record(
465 &self,
466 _db: &DatabaseConnection,
467 product_id: i32,
468 quantity: i64,
469 ) -> Result<(), SimError> {
470 self.inv_rec_cache()?
471 .entry(product_id)
472 .await?
473 .and_try_compute_with(|entry| async {
474 if let Some(entry) = entry {
475 let mut inventory_record = entry.into_value();
476 let new_stock = inventory_record.stock + quantity;
477 if new_stock < 0 {
478 return Err(simerr!(
479 "Not enough stock for product ID {}: need {}, but only {} remaining",
480 product_id,
481 -quantity,
482 inventory_record.stock
483 ));
484 }
485 inventory_record.stock = new_stock;
486 Ok(cache::Op::Put(inventory_record))
487 }
488 else {
489 Err(simerr!(
490 "Can't update non-existing inventory record for product ID: {}",
491 product_id
492 ))
493 }
494 })
495 .await?;
496
497 Ok(())
498 }
499
500 #[instrument(level = "trace", skip(self, db))]
501 async fn update_order(&self, db: &DatabaseConnection, order_update: &Order) -> Result<(), SimError> {
502 debug!(
503 "Updating order {} on product {}: {}, {:?}",
504 order_update.id, order_update.product_id, order_update.quantity, order_update.status
505 );
506 self.order_cache()?
507 .entry(order_update.id)
508 .await?
509 .and_try_compute_with(|entry| async {
510 if let Some(entry) = entry {
511 self.update_inventory(db, order_update).await?;
512 let mut order: Order = entry.into_value();
513 order.status = order_update.status;
514 Ok(cache::Op::Put(order))
515 }
516 else {
517 Err(simerr!("Can't update non-existing order for ID: {}", order_update.id))
518 }
519 })
520 .await?;
521
522 Ok(())
523 }
524
525 #[instrument(level = "trace", skip(self, _db))]
526 async fn update_product_view_count(&self, _db: &DatabaseConnection, product_id: i32) -> Result<(), SimError> {
527 self.product_cache()?
528 .entry(product_id)
529 .await?
530 .and_try_compute_with(|entry| async {
531 if let Some(entry) = entry {
532 let mut product = entry.into_value();
533 product.views += 1;
534 Ok(cache::Op::Put(product))
535 }
536 else {
537 Err(simerr!("Can't update non-existing product for ID: {}", product_id))
538 }
539 })
540 .await?;
541
542 Ok(())
543 }
544
545 #[instrument(level = "trace", skip(self, _db))]
546 async fn update_session(&self, _db: &DatabaseConnection, session_update: &Session) -> Result<(), SimError> {
547 debug!(
548 "Updating session {} for customer {:?}",
549 session_update.id, session_update.customer_id
550 );
551 self.session_cache()?
552 .entry(session_update.id)
553 .await?
554 .and_try_compute_with(|entry| async {
555 if let Some(entry) = entry {
556 let mut session = entry.into_value();
557 session.customer_id = session_update.customer_id;
558 session.expires_on = session_update.expires_on;
559 Ok(cache::Op::Put(session))
560 }
561 else {
562 Err(simerr!(
563 "Can't update non-existing session for ID: {}",
564 session_update.id
565 ))
566 }
567 })
568 .await?;
569
570 Ok(())
571 }
572
573 #[instrument(level = "trace", skip(self, db))]
574 async fn collect_sessions(&self, db: &DatabaseConnection) -> Result<(), SimError> {
575 self.collect_session_stubs(db).await?;
578
579 let session_cache = self.session_cache()?;
580 let user_sessions = Sessions::find()
581 .select_only()
582 .column(session::Column::Id)
583 .filter(session::Column::CustomerId.is_not_null())
584 .into_tuple::<i64>()
585 .all(db)
586 .await?;
587
588 for session_id in user_sessions {
589 session_cache
590 .entry(session_id)
591 .await?
592 .and_try_compute_with(|entry| async {
593 if let Some(entry) = entry {
594 let session = entry.into_value();
595 if session.expires_on <= self.current_day() {
596 Ok(cache::Op::Remove)
598 }
599 else {
600 Ok(cache::Op::Nop)
602 }
603 }
604 else {
605 Ok(cache::Op::Nop)
609 }
610 })
611 .await?;
612 }
613
614 Ok(())
615 }
616
617 async fn step_complete(&self, _db: &DatabaseConnection, step_num: usize) -> Result<(), SimError> {
618 let elapsed = self.started().elapsed().as_secs_f64();
619 self.app()?.set_cached_per_sec(step_num as f64 / elapsed);
620 Ok(())
621 }
622
623 #[instrument(level = "trace", skip(self))]
624 async fn curtain_call(&self) -> Result<(), SimError> {
625 self.app()?.report_info("Shutting down caches...");
626
627 self.customer_cache()?.close().await?;
628 self.product_cache()?.close().await?;
629 self.inv_rec_cache()?.close().await?;
630 self.order_cache()?.close().await?;
631 self.session_cache()?.close().await?;
632
633 Ok(())
634 }
635}
636
637impl<APP, D> DBProvider for TestCompany<APP, D>
638where
639 APP: SimulationApp,
640 D: DatabaseDriver,
641{
642 fn db_driver(&self) -> Result<Arc<impl DatabaseDriver>, SimErrorAny> {
643 Ok(self.db())
644 }
645
646 fn db_connection(&self) -> Result<DatabaseConnection, SimErrorAny> {
647 Ok(self.db().connection())
648 }
649}