wb_cache/test/simulation/scriptwriter/
reporter.rs

1use std::cell::RefCell;
2use std::sync::atomic::AtomicI32;
3use std::sync::Arc;
4use std::thread;
5use std::thread::JoinHandle;
6use std::time::Duration;
7use std::time::Instant;
8
9use console::style;
10use console::Term;
11use fieldx_plus::fx_plus;
12use num_format::utils::DecimalStr;
13use num_format::utils::InfinityStr;
14use num_format::utils::MinusSignStr;
15use num_format::utils::NanStr;
16use num_format::utils::PlusSignStr;
17use num_format::utils::SeparatorStr;
18use num_format::Format;
19use num_format::Locale;
20use num_format::SystemLocale;
21use num_format::ToFormattedString;
22
23use crate::test::simulation::types::simerr;
24use crate::test::simulation::types::Result;
25use crate::test::simulation::types::SimErrorAny;
26
27use super::ScriptWriter;
28
29const TICKER_CHARS: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
30
31// ScriptWriter reporter trait
32pub trait SwReporter: Send + Sync + 'static {
33    fn out(&self, msg: &str) -> Result<()>;
34    fn refresh_report(&self) -> Result<()>;
35    fn set_backorders(&self, backorders: usize);
36    fn set_pending_orders(&self, orders: usize);
37    fn set_rnd_pool_task_status(&self, status: TaskStatus);
38    fn set_scenario_capacity(&self, capacity: usize);
39    fn set_scenario_lines(&self, lines: usize);
40    fn set_task_count(&self, count: usize);
41    fn set_task_status(&self, task_id: usize, status: TaskStatus);
42    fn start(&self) -> Result<()>;
43    fn stop(&self) -> Result<()>;
44}
45
46#[derive(Clone)]
47pub enum Reporter {
48    Formatted(Arc<FormattedReporter>),
49    Quiet,
50}
51
52impl SwReporter for Reporter {
53    fn out(&self, msg: &str) -> Result<()> {
54        match self {
55            Reporter::Formatted(reporter) => reporter.out(msg),
56            Reporter::Quiet => Ok(()),
57        }
58    }
59
60    fn refresh_report(&self) -> Result<()> {
61        match self {
62            Reporter::Formatted(reporter) => reporter.refresh_report(),
63            Reporter::Quiet => Ok(()),
64        }
65    }
66
67    fn set_backorders(&self, backorders: usize) {
68        if let Reporter::Formatted(reporter) = self {
69            reporter.set_backorders(backorders);
70        }
71    }
72
73    fn set_pending_orders(&self, orders: usize) {
74        if let Reporter::Formatted(reporter) = self {
75            reporter.set_pending_orders(orders);
76        }
77    }
78
79    fn set_rnd_pool_task_status(&self, status: TaskStatus) {
80        if let Reporter::Formatted(reporter) = self {
81            reporter.set_rnd_pool_task_status(status);
82        }
83    }
84
85    fn set_scenario_capacity(&self, capacity: usize) {
86        if let Reporter::Formatted(reporter) = self {
87            reporter.set_scenario_capacity(capacity);
88        }
89    }
90
91    fn set_scenario_lines(&self, lines: usize) {
92        if let Reporter::Formatted(reporter) = self {
93            reporter.set_scenario_lines(lines);
94        }
95    }
96
97    fn set_task_count(&self, count: usize) {
98        if let Reporter::Formatted(reporter) = self {
99            reporter.set_task_count(count);
100        }
101    }
102
103    fn set_task_status(&self, task_id: usize, status: TaskStatus) {
104        if let Reporter::Formatted(reporter) = self {
105            reporter.set_task_status(task_id, status);
106        }
107    }
108
109    fn start(&self) -> Result<()> {
110        match self {
111            Reporter::Formatted(reporter) => reporter.start(),
112            Reporter::Quiet => Ok(()),
113        }
114    }
115
116    fn stop(&self) -> Result<()> {
117        match self {
118            Reporter::Formatted(reporter) => reporter.stop(),
119            Reporter::Quiet => Ok(()),
120        }
121    }
122}
123
124enum NumFormatter {
125    Sys(Box<SystemLocale>),
126    Explicit(Locale),
127}
128
129thread_local! {
130    static NUM_LOCALE: RefCell<NumFormatter> = {
131        RefCell::new(SystemLocale::default()
132            .map_or_else(
133                |_| NumFormatter::Explicit(Locale::en),
134                |sl| NumFormatter::Sys(Box::new(sl)),
135            ))
136    };
137}
138
139impl Format for NumFormatter {
140    fn decimal(&self) -> num_format::utils::DecimalStr<'_> {
141        DecimalStr::new(match self {
142            NumFormatter::Sys(locale) => locale.decimal(),
143            NumFormatter::Explicit(locale) => locale.decimal(),
144        })
145        .unwrap()
146    }
147
148    fn grouping(&self) -> num_format::Grouping {
149        match self {
150            NumFormatter::Sys(locale) => locale.grouping(),
151            NumFormatter::Explicit(locale) => locale.grouping(),
152        }
153    }
154
155    fn infinity(&self) -> num_format::utils::InfinityStr<'_> {
156        InfinityStr::new(match self {
157            NumFormatter::Sys(locale) => locale.infinity(),
158            NumFormatter::Explicit(locale) => locale.infinity(),
159        })
160        .unwrap()
161    }
162
163    fn minus_sign(&self) -> num_format::utils::MinusSignStr<'_> {
164        MinusSignStr::new(match self {
165            NumFormatter::Sys(locale) => locale.minus_sign(),
166            NumFormatter::Explicit(locale) => locale.minus_sign(),
167        })
168        .unwrap()
169    }
170
171    fn nan(&self) -> num_format::utils::NanStr<'_> {
172        NanStr::new(match self {
173            NumFormatter::Sys(locale) => locale.nan(),
174            NumFormatter::Explicit(locale) => locale.nan(),
175        })
176        .unwrap()
177    }
178
179    fn plus_sign(&self) -> num_format::utils::PlusSignStr<'_> {
180        PlusSignStr::new(match self {
181            NumFormatter::Sys(locale) => locale.plus_sign(),
182            NumFormatter::Explicit(locale) => locale.plus_sign(),
183        })
184        .unwrap()
185    }
186
187    fn separator(&self) -> num_format::utils::SeparatorStr<'_> {
188        SeparatorStr::new(match self {
189            NumFormatter::Sys(locale) => locale.separator(),
190            NumFormatter::Explicit(locale) => locale.separator(),
191        })
192        .unwrap()
193    }
194}
195
196#[derive(Clone, Copy, Debug)]
197pub enum TaskStatus {
198    Idle,
199    Busy,
200}
201
202#[fx_plus(child(ScriptWriter, unwrap(or_else(SimErrorAny, no_scenario))), sync, rc)]
203pub struct FormattedReporter {
204    #[fieldx(private, lock, get, get_mut, builder(off), default(Vec::new()))]
205    task_status: Vec<TaskStatus>,
206
207    #[fieldx(private, lock, get(copy), get_mut, builder(off), default(TaskStatus::Idle))]
208    rnd_pool_task: TaskStatus,
209
210    #[fieldx(private, lock, get, set, builder(off))]
211    task_line: String,
212
213    #[fieldx(lock, clearer, private, set, get(off))]
214    refresher_handler: JoinHandle<Result<()>>,
215
216    #[fieldx(private, lock, set, get(copy), builder(off))]
217    shutdown: bool,
218
219    #[fieldx(lock, set("_set_scenario_lines"), get(copy), builder(off), default(0))]
220    scenario_lines: usize,
221
222    #[fieldx(lock, set("_set_scenario_capacity"), get(copy), builder(off), default(0))]
223    scenario_capacity: usize,
224
225    #[fieldx(lock, set("_set_pending_orders"), get(copy), builder(off), default(0))]
226    pending_orders: usize,
227
228    #[fieldx(lock, set("_set_backorders"), get(copy), builder(off), default(0))]
229    backorders: usize,
230
231    #[fieldx(lazy, get_mut, private, builder(off))]
232    term: Term,
233
234    #[fieldx(lazy, get(copy), builder(off))]
235    user_attended: bool,
236
237    #[fieldx(get(copy), default(Duration::from_millis(100)))]
238    refresh_interval: Duration,
239
240    #[fieldx(inner_mut, set, get(copy), builder(off), default(Instant::now()))]
241    last_refresh: Instant,
242
243    #[fieldx(inner_mut, get, get_mut, builder(off), default(AtomicI32::new(0)))]
244    refresher_count: AtomicI32,
245
246    #[fieldx(inner_mut, get, get_mut, builder(off), default(AtomicI32::new(0)))]
247    task_changes: AtomicI32,
248
249    #[fieldx(lock, set, get, get_mut, builder(off), default(Vec::new()))]
250    messages: Vec<String>,
251}
252
253impl FormattedReporter {
254    fn build_term(&self) -> Term {
255        Term::buffered_stdout()
256    }
257
258    fn build_user_attended(&self) -> bool {
259        console::user_attended()
260    }
261
262    fn no_scenario(&self) -> SimErrorAny {
263        simerr!("Scenario object is gone!")
264    }
265
266    fn status_report(&self) -> Result<Vec<String>> {
267        let scenario = self.parent()?;
268        let mut status_lines = Vec::new();
269        status_lines.push(format!("Day            {:>11}", scenario.current_day()));
270        status_lines.push(format!("Customers      {:>11}", format_num(scenario.customers().len())));
271        status_lines.push(format!(
272            "Pending orders {:>11} | Backordered {:>11}",
273            format_num(self.pending_orders()),
274            format_num(self.backorders())
275        ));
276        status_lines.push(format!(
277            "Scenario lines {:>11} | Capacity {:>11} ({}%)",
278            format_num(self.scenario_lines()),
279            format_num(self.scenario_capacity()),
280            (self.scenario_lines() as f64 / self.scenario_capacity() as f64 * 100.0).round()
281        ));
282        Ok(status_lines)
283    }
284
285    fn spurt_messages(&self, term: &Term, header_height: usize) -> Result<()> {
286        let messages = self.set_messages(Vec::new());
287        let msg_count = messages.len();
288        if self.user_attended() {
289            let (_, height) = term.size();
290            let height = height as usize - header_height;
291            let first_msg = msg_count.max(height) - height;
292            for msg in &messages[first_msg..] {
293                term.clear_line()?;
294                term.write_line(&msg.to_string())?;
295            }
296        }
297        else {
298            for msg in messages {
299                println!("{msg}");
300            }
301        }
302        Ok(())
303    }
304
305    fn refresher(&self) -> Result<()> {
306        let interval = self.refresh_interval();
307        let mut next_interval = interval;
308        loop {
309            if self.shutdown() {
310                break;
311            }
312            self.refresher_count_mut()
313                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
314            thread::sleep(next_interval);
315            let duration = self.last_refresh().elapsed();
316            if duration >= interval {
317                self.refresh_report()?;
318                next_interval = interval;
319            }
320            else {
321                // If a refresh took place while we slept then sleep the remaining time to make it a full interval.
322                next_interval = interval - duration;
323            }
324        }
325
326        Ok(())
327    }
328
329    fn refresh_task_status(&self) {
330        let statuses = self.task_status();
331        let task_box_busy = style("■").on_black();
332        let task_box_idle = style("□").on_black();
333        let r_idx = self.refresher_count().load(std::sync::atomic::Ordering::Relaxed) as usize % TICKER_CHARS.len();
334        let task_rotator = style(format!("{} ", TICKER_CHARS[r_idx])).cyan().dim();
335        let task_changes = format_num(self.task_changes().load(std::sync::atomic::Ordering::Relaxed));
336        let rnd_pool_status = match self.rnd_pool_task() {
337            TaskStatus::Idle => task_box_idle.clone().white().dim(),
338            TaskStatus::Busy => task_box_busy.clone().magenta().bright(),
339        };
340        self.set_task_line(format!(
341            "{}Workers [{}{}] State transitions: {}",
342            task_rotator,
343            rnd_pool_status,
344            statuses
345                .iter()
346                .map(|s| {
347                    let tb = match s {
348                        TaskStatus::Idle => task_box_idle.clone().cyan().dim(),
349                        TaskStatus::Busy => task_box_busy.clone().yellow(),
350                    };
351                    tb.to_string()
352                })
353                .collect::<Vec<_>>()
354                .join(""),
355            task_changes
356        ));
357    }
358}
359
360impl SwReporter for FormattedReporter {
361    fn start(&self) -> Result<()> {
362        self.term().clear_screen().unwrap();
363        self.term().hide_cursor().unwrap();
364        self.set_shutdown(false);
365        if self.user_attended() {
366            let myself = self
367                .myself()
368                .ok_or(simerr!("Failed to get myself for reporter object"))?;
369            self.set_refresher_handler(
370                thread::Builder::new()
371                    .name("refresher".to_string())
372                    .spawn(move || myself.refresher())?,
373            );
374        }
375
376        Ok(())
377    }
378
379    fn stop(&self) -> Result<()> {
380        self.set_shutdown(true);
381        if let Some(handle) = self.clear_refresher_handler() {
382            if let Err(err) = handle.join() {
383                eprintln!("Screen updater thread failed: {err:?}");
384            }
385        }
386        self.term().show_cursor().unwrap();
387        Ok(())
388    }
389
390    fn out(&self, msg: &str) -> Result<()> {
391        self.messages_mut().push(msg.to_string());
392        if !self.user_attended() && self.messages().len() > 20 {
393            self.spurt_messages(&self.term(), 0)?;
394        }
395        Ok(())
396    }
397
398    fn set_task_count(&self, count: usize) {
399        self.task_status_mut().resize(count, TaskStatus::Idle);
400    }
401
402    fn set_task_status(&self, task_id: usize, status: TaskStatus) {
403        self.task_changes_mut()
404            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
405        if task_id < self.task_status().len() {
406            self.task_status_mut()[task_id] = status;
407        }
408    }
409
410    fn set_rnd_pool_task_status(&self, status: TaskStatus) {
411        *self.rnd_pool_task_mut() = status;
412    }
413
414    fn refresh_report(&self) -> Result<()> {
415        let mut status_lines = self.status_report()?;
416        let (t_height, t_width) = self.term().size();
417        status_lines.push(style("─".repeat(t_width as usize)).cyan().dim().to_string());
418
419        let header_height = status_lines.len();
420        let term = self.term_mut();
421        if self.user_attended() {
422            term.move_cursor_to(0, t_height as usize - 1)?;
423        }
424
425        self.refresh_task_status();
426        self.spurt_messages(&term, header_height)?;
427
428        if self.user_attended() {
429            term.move_cursor_to(0, header_height)?;
430            term.clear_last_lines(header_height)?;
431            term.write_line(&self.task_line())?;
432            for line in status_lines {
433                term.clear_line()?;
434                term.write_line(&line)?;
435            }
436            term.move_cursor_to(0, t_height as usize - 1)?;
437            term.flush()?;
438        }
439        else {
440            for line in status_lines {
441                println!("{line}");
442            }
443        }
444
445        self.set_last_refresh(Instant::now());
446        Ok(())
447    }
448
449    fn set_scenario_lines(&self, lines: usize) {
450        self._set_scenario_lines(lines);
451    }
452
453    fn set_scenario_capacity(&self, capacity: usize) {
454        self._set_scenario_capacity(capacity);
455    }
456
457    fn set_backorders(&self, backorders: usize) {
458        self._set_backorders(backorders);
459    }
460
461    fn set_pending_orders(&self, orders: usize) {
462        self._set_pending_orders(orders);
463    }
464}
465
466fn format_num<N: ToFormattedString>(num: N) -> String {
467    NUM_LOCALE.with_borrow(|l| num.to_formatted_string(l))
468}