Skip to main content

tpcgen_cli/tpch_cli/
progress.rs

1//! Progress tracking for table generation.
2//!
3//! # Overview
4//!
5//! [`ProgressTracker`] is a small, dyn-compatible trait that receives
6//! generation events from [`crate::tpch_cli::runner::PlanRunner`]. The runner calls:
7//!
8//! 1. [`ProgressTracker::register`] once per table, before any worker
9//!    starts, with the total number of output units the table will produce
10//!    (chunks for TBL/CSV, row groups for Parquet).
11//! 2. [`ProgressTracker::increment`] after output units are written.
12//!    Multiple table-generation tasks may call it concurrently, so impls
13//!    must be `Send + Sync` and `increment` itself should be lightweight.
14//! 3. [`ProgressTracker::finish`] once on the success path when the
15//!    runner exits. Implementations should use `finish` for normal
16//!    success cleanup and `Drop` only as an error or panic fallback.
17//!
18//! `register` and `finish` are invoked serially by the runner and may
19//! do bookkeeping or I/O; `increment` may run concurrently while output
20//! is being written.
21//!
22//! Implementations must not panic and must not propagate I/O errors —
23//! progress reporting is best-effort and must never affect the data
24//! path.
25//!
26//! # Default implementation
27//!
28//! When the `indicatif-progress` feature is enabled (on by default), the
29//! crate provides an `IndicatifProgress` implementation, which renders
30//! one progress bar per table on stderr using the `indicatif` crate.
31//! Library users who do not want to pull in `indicatif` can disable default
32//! features and still supply their own [`ProgressTracker`] implementation.
33//! Without `indicatif-progress` and without a custom tracker, progress
34//! reporting is a no-op.
35//!
36//! # Example: a custom logging tracker
37//!
38//! ```
39//! use std::sync::atomic::{AtomicU64, Ordering};
40//! use tpcgen_cli::tpch_cli::progress::ProgressTracker;
41//! use tpcgen_cli::tpch_cli::Table;
42//!
43//! #[derive(Debug)]
44//! struct LoggingTracker {
45//!     written: AtomicU64,
46//! }
47//!
48//! impl ProgressTracker for LoggingTracker {
49//!     fn register(&self, table: Table, total: u64) {
50//!         eprintln!("plan: {table:?} -> {total} output units");
51//!     }
52//!     fn increment(&self, _table: Table, units: u64) {
53//!         self.written.fetch_add(units, Ordering::Relaxed);
54//!     }
55//!     fn finish(&self) {
56//!         eprintln!("done: {} output units", self.written.load(Ordering::Relaxed));
57//!     }
58//! }
59//! ```
60
61use crate::tpch_cli::output_plan::OutputPlan;
62use crate::tpch_cli::Table;
63use std::collections::BTreeMap;
64use std::fmt;
65use std::sync::Arc;
66
67/// Receives generation-progress events for one
68/// [`PlanRunner`](crate::tpch_cli::runner::PlanRunner) invocation.
69///
70/// See the [module-level documentation](self) for the call-order
71/// contract. Trackers are passed to the runner as an
72/// [`std::sync::Arc`] and shared across concurrent generation tasks, so
73/// they must be `Send + Sync`.
74/// They must also be `Debug` so containing types can derive `Debug`.
75pub trait ProgressTracker: Send + Sync + fmt::Debug {
76    /// Pre-register a table with its total expected output-unit count.
77    ///
78    /// Called once per table before any worker starts. Implementations
79    /// that need to know totals up front (e.g. to render a progress bar
80    /// or compute an ETA) should override this; the default does
81    /// nothing.
82    fn register(&self, _table: Table, _total_units: u64) {}
83
84    /// Advance the counter for `table` by `units` output units.
85    ///
86    /// Called after generated output units are written. Multiple
87    /// table-generation tasks may call this concurrently, so implementations
88    /// should be lightweight and must never panic.
89    fn increment(&self, table: Table, units: u64);
90
91    /// Called once after the last [`Self::increment`] on the success
92    /// path. Implementations should use this for normal success cleanup
93    /// and `Drop` only as an error or panic fallback. The default does
94    /// nothing.
95    fn finish(&self) {}
96}
97
98/// Progress handle for one [`PlanRunner::run`](crate::tpch_cli::runner::PlanRunner::run)
99/// invocation.
100///
101/// Owns run-level progress lifecycle: registering totals, accounting for
102/// skipped outputs, and finishing the tracker.
103#[derive(Debug, Clone, Default)]
104pub(crate) struct RunProgress {
105    tracker: Option<Arc<dyn ProgressTracker>>,
106}
107
108impl RunProgress {
109    pub(crate) fn with_tracker(tracker: Arc<dyn ProgressTracker>) -> Self {
110        Self {
111            tracker: Some(tracker),
112        }
113    }
114
115    pub(crate) fn register_totals(&self, plans: &[OutputPlan]) {
116        if let Some(tracker) = self.tracker.as_ref() {
117            let mut totals: BTreeMap<Table, u64> = BTreeMap::new();
118            for plan in plans {
119                *totals.entry(plan.table()).or_insert(0) += plan.chunk_count() as u64;
120            }
121            for (table, total) in totals {
122                tracker.register(table, total);
123            }
124        }
125    }
126
127    pub(crate) fn increment_for_existing(&self, plan: &OutputPlan) {
128        if let Some(tracker) = self.tracker.as_ref() {
129            tracker.increment(plan.table(), plan.chunk_count() as u64);
130        }
131    }
132
133    pub(crate) fn for_table(&self, table: Table) -> TableProgress {
134        TableProgress::for_table(self.tracker.clone(), table)
135    }
136
137    pub(crate) fn finish(self) {
138        if let Some(tracker) = self.tracker {
139            tracker.finish();
140        }
141    }
142}
143
144/// Progress handle for one table output stream.
145///
146/// Used by format writers to report each successfully written output unit
147/// without knowing whether progress tracking is enabled.
148#[derive(Clone, Default)]
149pub(crate) struct TableProgress {
150    tracker: Option<(Arc<dyn ProgressTracker>, Table)>,
151}
152
153impl TableProgress {
154    pub(crate) fn for_table(progress: Option<Arc<dyn ProgressTracker>>, table: Table) -> Self {
155        Self {
156            tracker: progress.map(|progress| (progress, table)),
157        }
158    }
159
160    pub(crate) fn increment_output_unit(&self) {
161        if let Some((progress, table)) = self.tracker.as_ref() {
162            progress.increment(*table, 1);
163        }
164    }
165}
166
167#[cfg(feature = "indicatif-progress")]
168pub use indicatif_impl::IndicatifProgress;
169
170#[cfg(feature = "indicatif-progress")]
171mod indicatif_impl {
172    use super::ProgressTracker;
173    use crate::tpch_cli::Table;
174    use indicatif::{MultiProgress, ProgressBar, ProgressFinish, ProgressStyle};
175    use std::collections::BTreeMap;
176    use std::io::{self, Write};
177    use std::sync::{OnceLock, RwLock};
178
179    /// Default [`ProgressTracker`] implementation backed by
180    /// [`indicatif::MultiProgress`].
181    ///
182    /// Renders one bar per table on stderr. Bars are pre-allocated in
183    /// [`ProgressTracker::register`] and are looked up by [`Table`] on
184    /// each [`ProgressTracker::increment`] call. Lookup uses a `RwLock`
185    /// read on the increment path; this is uncontended after the serial
186    /// `register` phase completes.
187    #[derive(Debug)]
188    pub struct IndicatifProgress {
189        multi: MultiProgress,
190        tables: RwLock<BTreeMap<Table, ProgressBar>>,
191    }
192
193    impl IndicatifProgress {
194        /// Construct an empty tracker. Tables are added via
195        /// [`ProgressTracker::register`].
196        pub fn new() -> Self {
197            Self {
198                multi: MultiProgress::new(),
199                tables: RwLock::new(BTreeMap::new()),
200            }
201        }
202
203        /// Return a writer that coordinates stderr log writes with progress
204        /// bar redraws.
205        pub fn log_writer(&self) -> Box<dyn io::Write + Send + 'static> {
206            Box::new(IndicatifLogWriter {
207                multi: self.multi.clone(),
208            })
209        }
210    }
211
212    impl Default for IndicatifProgress {
213        fn default() -> Self {
214            Self::new()
215        }
216    }
217
218    impl ProgressTracker for IndicatifProgress {
219        fn register(&self, table: Table, total_units: u64) {
220            let Ok(mut tables) = self.tables.write() else {
221                return;
222            };
223
224            let pb = self.multi.add(ProgressBar::new(total_units));
225            pb.set_style(bar_style().clone());
226            pb.set_message(table.to_string());
227            let pb = pb.with_finish(ProgressFinish::AndLeave);
228            // Write-lock is only contended during the register phase, which
229            // happens serially before any worker task starts.
230            tables.insert(table, pb);
231        }
232
233        fn increment(&self, table: Table, units: u64) {
234            // Minimize the read-lock scope so concurrent `increment` callers
235            // don't serialize on it. Cloning the bar is a cheap `Arc` bump,
236            // and `ProgressBar::inc` is internally thread-safe.
237            let bar = {
238                let Ok(tables) = self.tables.read() else {
239                    return;
240                };
241                tables.get(&table).cloned()
242            };
243            if let Some(bar) = bar {
244                bar.inc(units);
245            }
246        }
247
248        fn finish(&self) {
249            let bars = {
250                let Ok(tables) = self.tables.read() else {
251                    return;
252                };
253                tables.values().cloned().collect::<Vec<_>>()
254            };
255            for bar in bars {
256                bar.finish_using_style();
257            }
258        }
259    }
260
261    fn bar_style() -> &'static ProgressStyle {
262        static STYLE: OnceLock<ProgressStyle> = OnceLock::new();
263        STYLE.get_or_init(|| {
264            ProgressStyle::default_bar()
265                .template("{msg:10} [{bar:28}]   Progress: {percent:>3}%")
266                .expect("static progress bar template is valid")
267                .progress_chars("█▓░")
268        })
269    }
270
271    struct IndicatifLogWriter {
272        multi: MultiProgress,
273    }
274
275    impl Write for IndicatifLogWriter {
276        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
277            self.multi.suspend(|| {
278                let mut stderr = io::stderr().lock();
279                stderr.write(buf)
280            })
281        }
282
283        fn flush(&mut self) -> io::Result<()> {
284            self.multi.suspend(|| {
285                let mut stderr = io::stderr().lock();
286                stderr.flush()
287            })
288        }
289    }
290
291    #[cfg(test)]
292    mod tests {
293        use super::*;
294
295        #[test]
296        fn registers_and_increments() {
297            let t = IndicatifProgress::new();
298            t.register(Table::Lineitem, 60);
299            t.register(Table::Orders, 15);
300            t.increment(Table::Lineitem, 1);
301            t.increment(Table::Orders, 5);
302
303            let tables = t.tables.read().unwrap();
304            assert_eq!(tables[&Table::Lineitem].position(), 1);
305            assert_eq!(tables[&Table::Orders].position(), 5);
306        }
307
308        #[test]
309        fn reaches_total() {
310            let t = IndicatifProgress::new();
311            t.register(Table::Orders, 5);
312            for _ in 0..5 {
313                t.increment(Table::Orders, 1);
314            }
315            assert_eq!(t.tables.read().unwrap()[&Table::Orders].position(), 5);
316        }
317
318        #[test]
319        fn unknown_table_is_no_op() {
320            // Incrementing a table not registered must not panic.
321            let t = IndicatifProgress::new();
322            t.register(Table::Orders, 1);
323            t.increment(Table::Lineitem, 1);
324            assert_eq!(t.tables.read().unwrap()[&Table::Orders].position(), 0);
325        }
326
327        #[test]
328        fn finish_marks_registered_bars_finished() {
329            let t = IndicatifProgress::new();
330            t.register(Table::Orders, 2);
331            t.increment(Table::Orders, 2);
332            t.finish();
333
334            assert!(t.tables.read().unwrap()[&Table::Orders].is_finished());
335        }
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use crate::tpch_cli::Table;
343    use std::sync::{
344        atomic::{AtomicU64, Ordering},
345        Arc, Mutex,
346    };
347
348    /// Mock implementation that records every event. Demonstrates that
349    /// the trait is dyn-compatible and usable from external code without
350    /// pulling in `indicatif`.
351    #[derive(Debug, Default)]
352    struct MockTracker {
353        registered: Mutex<Vec<(Table, u64)>>,
354        total_increments: AtomicU64,
355        finished: AtomicU64,
356    }
357
358    impl ProgressTracker for MockTracker {
359        fn register(&self, table: Table, total_units: u64) {
360            self.registered.lock().unwrap().push((table, total_units));
361        }
362        fn increment(&self, _table: Table, units: u64) {
363            self.total_increments.fetch_add(units, Ordering::Relaxed);
364        }
365        fn finish(&self) {
366            self.finished.fetch_add(1, Ordering::Relaxed);
367        }
368    }
369
370    #[test]
371    fn mock_tracker_works_through_arc_dyn() {
372        let mock = Arc::new(MockTracker::default());
373        let dynamic: Arc<dyn ProgressTracker> = mock.clone();
374        dynamic.register(Table::Lineitem, 10);
375        dynamic.register(Table::Orders, 4);
376        dynamic.increment(Table::Lineitem, 3);
377        dynamic.increment(Table::Orders, 1);
378        dynamic.finish();
379
380        assert_eq!(
381            *mock.registered.lock().unwrap(),
382            vec![(Table::Lineitem, 10), (Table::Orders, 4)]
383        );
384        assert_eq!(mock.total_increments.load(Ordering::Relaxed), 4);
385        assert_eq!(mock.finished.load(Ordering::Relaxed), 1);
386    }
387
388    #[test]
389    fn default_register_and_finish_are_noops() {
390        // An impl that only overrides `increment` should compile and run.
391        #[derive(Debug)]
392        struct Minimal(AtomicU64);
393        impl ProgressTracker for Minimal {
394            fn increment(&self, _t: Table, c: u64) {
395                self.0.fetch_add(c, Ordering::Relaxed);
396            }
397        }
398        let m = Minimal(AtomicU64::new(0));
399        m.register(Table::Region, 99); // no-op default
400        m.increment(Table::Region, 7);
401        m.finish(); // no-op default
402        assert_eq!(m.0.load(Ordering::Relaxed), 7);
403    }
404}