tpcgen_cli/tpch_cli/progress.rs
1//! Progress tracking for table generation.
2//!
3//! # Overview
4//!
5//! [`ProgressTracker`] is a small, dyn-compatible trait that receives
6//! generation events from [`crate::tpch_cli::runner::PlanRunner`]. The runner calls:
7//!
8//! 1. [`ProgressTracker::register`] once per table, before any worker
9//! starts, with the total number of output units the table will produce
10//! (chunks for TBL/CSV, row groups for Parquet).
11//! 2. [`ProgressTracker::increment`] after output units are written.
12//! Multiple table-generation tasks may call it concurrently, so impls
13//! must be `Send + Sync` and `increment` itself should be lightweight.
14//! 3. [`ProgressTracker::finish`] once on the success path when the
15//! runner exits. Implementations should use `finish` for normal
16//! success cleanup and `Drop` only as an error or panic fallback.
17//!
18//! `register` and `finish` are invoked serially by the runner and may
19//! do bookkeeping or I/O; `increment` may run concurrently while output
20//! is being written.
21//!
22//! Implementations must not panic and must not propagate I/O errors —
23//! progress reporting is best-effort and must never affect the data
24//! path.
25//!
26//! # Default implementation
27//!
28//! When the `indicatif-progress` feature is enabled (on by default), the
29//! crate provides an `IndicatifProgress` implementation, which renders
30//! one progress bar per table on stderr using the `indicatif` crate.
31//! Library users who do not want to pull in `indicatif` can disable default
32//! features and still supply their own [`ProgressTracker`] implementation.
33//! Without `indicatif-progress` and without a custom tracker, progress
34//! reporting is a no-op.
35//!
36//! # Example: a custom logging tracker
37//!
38//! ```
39//! use std::sync::atomic::{AtomicU64, Ordering};
40//! use tpcgen_cli::tpch_cli::progress::ProgressTracker;
41//! use tpcgen_cli::tpch_cli::Table;
42//!
43//! #[derive(Debug)]
44//! struct LoggingTracker {
45//! written: AtomicU64,
46//! }
47//!
48//! impl ProgressTracker for LoggingTracker {
49//! fn register(&self, table: Table, total: u64) {
50//! eprintln!("plan: {table:?} -> {total} output units");
51//! }
52//! fn increment(&self, _table: Table, units: u64) {
53//! self.written.fetch_add(units, Ordering::Relaxed);
54//! }
55//! fn finish(&self) {
56//! eprintln!("done: {} output units", self.written.load(Ordering::Relaxed));
57//! }
58//! }
59//! ```
60
61use crate::tpch_cli::output_plan::OutputPlan;
62use crate::tpch_cli::Table;
63use std::collections::BTreeMap;
64use std::fmt;
65use std::sync::Arc;
66
67/// Receives generation-progress events for one
68/// [`PlanRunner`](crate::tpch_cli::runner::PlanRunner) invocation.
69///
70/// See the [module-level documentation](self) for the call-order
71/// contract. Trackers are passed to the runner as an
72/// [`std::sync::Arc`] and shared across concurrent generation tasks, so
73/// they must be `Send + Sync`.
74/// They must also be `Debug` so containing types can derive `Debug`.
75pub trait ProgressTracker: Send + Sync + fmt::Debug {
76 /// Pre-register a table with its total expected output-unit count.
77 ///
78 /// Called once per table before any worker starts. Implementations
79 /// that need to know totals up front (e.g. to render a progress bar
80 /// or compute an ETA) should override this; the default does
81 /// nothing.
82 fn register(&self, _table: Table, _total_units: u64) {}
83
84 /// Advance the counter for `table` by `units` output units.
85 ///
86 /// Called after generated output units are written. Multiple
87 /// table-generation tasks may call this concurrently, so implementations
88 /// should be lightweight and must never panic.
89 fn increment(&self, table: Table, units: u64);
90
91 /// Called once after the last [`Self::increment`] on the success
92 /// path. Implementations should use this for normal success cleanup
93 /// and `Drop` only as an error or panic fallback. The default does
94 /// nothing.
95 fn finish(&self) {}
96}
97
98/// Progress handle for one [`PlanRunner::run`](crate::tpch_cli::runner::PlanRunner::run)
99/// invocation.
100///
101/// Owns run-level progress lifecycle: registering totals, accounting for
102/// skipped outputs, and finishing the tracker.
103#[derive(Debug, Clone, Default)]
104pub(crate) struct RunProgress {
105 tracker: Option<Arc<dyn ProgressTracker>>,
106}
107
108impl RunProgress {
109 pub(crate) fn with_tracker(tracker: Arc<dyn ProgressTracker>) -> Self {
110 Self {
111 tracker: Some(tracker),
112 }
113 }
114
115 pub(crate) fn register_totals(&self, plans: &[OutputPlan]) {
116 if let Some(tracker) = self.tracker.as_ref() {
117 let mut totals: BTreeMap<Table, u64> = BTreeMap::new();
118 for plan in plans {
119 *totals.entry(plan.table()).or_insert(0) += plan.chunk_count() as u64;
120 }
121 for (table, total) in totals {
122 tracker.register(table, total);
123 }
124 }
125 }
126
127 pub(crate) fn increment_for_existing(&self, plan: &OutputPlan) {
128 if let Some(tracker) = self.tracker.as_ref() {
129 tracker.increment(plan.table(), plan.chunk_count() as u64);
130 }
131 }
132
133 pub(crate) fn for_table(&self, table: Table) -> TableProgress {
134 TableProgress::for_table(self.tracker.clone(), table)
135 }
136
137 pub(crate) fn finish(self) {
138 if let Some(tracker) = self.tracker {
139 tracker.finish();
140 }
141 }
142}
143
144/// Progress handle for one table output stream.
145///
146/// Used by format writers to report each successfully written output unit
147/// without knowing whether progress tracking is enabled.
148#[derive(Clone, Default)]
149pub(crate) struct TableProgress {
150 tracker: Option<(Arc<dyn ProgressTracker>, Table)>,
151}
152
153impl TableProgress {
154 pub(crate) fn for_table(progress: Option<Arc<dyn ProgressTracker>>, table: Table) -> Self {
155 Self {
156 tracker: progress.map(|progress| (progress, table)),
157 }
158 }
159
160 pub(crate) fn increment_output_unit(&self) {
161 if let Some((progress, table)) = self.tracker.as_ref() {
162 progress.increment(*table, 1);
163 }
164 }
165}
166
167#[cfg(feature = "indicatif-progress")]
168pub use indicatif_impl::IndicatifProgress;
169
170#[cfg(feature = "indicatif-progress")]
171mod indicatif_impl {
172 use super::ProgressTracker;
173 use crate::tpch_cli::Table;
174 use indicatif::{MultiProgress, ProgressBar, ProgressFinish, ProgressStyle};
175 use std::collections::BTreeMap;
176 use std::io::{self, Write};
177 use std::sync::{OnceLock, RwLock};
178
179 /// Default [`ProgressTracker`] implementation backed by
180 /// [`indicatif::MultiProgress`].
181 ///
182 /// Renders one bar per table on stderr. Bars are pre-allocated in
183 /// [`ProgressTracker::register`] and are looked up by [`Table`] on
184 /// each [`ProgressTracker::increment`] call. Lookup uses a `RwLock`
185 /// read on the increment path; this is uncontended after the serial
186 /// `register` phase completes.
187 #[derive(Debug)]
188 pub struct IndicatifProgress {
189 multi: MultiProgress,
190 tables: RwLock<BTreeMap<Table, ProgressBar>>,
191 }
192
193 impl IndicatifProgress {
194 /// Construct an empty tracker. Tables are added via
195 /// [`ProgressTracker::register`].
196 pub fn new() -> Self {
197 Self {
198 multi: MultiProgress::new(),
199 tables: RwLock::new(BTreeMap::new()),
200 }
201 }
202
203 /// Return a writer that coordinates stderr log writes with progress
204 /// bar redraws.
205 pub fn log_writer(&self) -> Box<dyn io::Write + Send + 'static> {
206 Box::new(IndicatifLogWriter {
207 multi: self.multi.clone(),
208 })
209 }
210 }
211
212 impl Default for IndicatifProgress {
213 fn default() -> Self {
214 Self::new()
215 }
216 }
217
218 impl ProgressTracker for IndicatifProgress {
219 fn register(&self, table: Table, total_units: u64) {
220 let Ok(mut tables) = self.tables.write() else {
221 return;
222 };
223
224 let pb = self.multi.add(ProgressBar::new(total_units));
225 pb.set_style(bar_style().clone());
226 pb.set_message(table.to_string());
227 let pb = pb.with_finish(ProgressFinish::AndLeave);
228 // Write-lock is only contended during the register phase, which
229 // happens serially before any worker task starts.
230 tables.insert(table, pb);
231 }
232
233 fn increment(&self, table: Table, units: u64) {
234 // Minimize the read-lock scope so concurrent `increment` callers
235 // don't serialize on it. Cloning the bar is a cheap `Arc` bump,
236 // and `ProgressBar::inc` is internally thread-safe.
237 let bar = {
238 let Ok(tables) = self.tables.read() else {
239 return;
240 };
241 tables.get(&table).cloned()
242 };
243 if let Some(bar) = bar {
244 bar.inc(units);
245 }
246 }
247
248 fn finish(&self) {
249 let bars = {
250 let Ok(tables) = self.tables.read() else {
251 return;
252 };
253 tables.values().cloned().collect::<Vec<_>>()
254 };
255 for bar in bars {
256 bar.finish_using_style();
257 }
258 }
259 }
260
261 fn bar_style() -> &'static ProgressStyle {
262 static STYLE: OnceLock<ProgressStyle> = OnceLock::new();
263 STYLE.get_or_init(|| {
264 ProgressStyle::default_bar()
265 .template("{msg:10} [{bar:28}] Progress: {percent:>3}%")
266 .expect("static progress bar template is valid")
267 .progress_chars("█▓░")
268 })
269 }
270
271 struct IndicatifLogWriter {
272 multi: MultiProgress,
273 }
274
275 impl Write for IndicatifLogWriter {
276 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
277 self.multi.suspend(|| {
278 let mut stderr = io::stderr().lock();
279 stderr.write(buf)
280 })
281 }
282
283 fn flush(&mut self) -> io::Result<()> {
284 self.multi.suspend(|| {
285 let mut stderr = io::stderr().lock();
286 stderr.flush()
287 })
288 }
289 }
290
291 #[cfg(test)]
292 mod tests {
293 use super::*;
294
295 #[test]
296 fn registers_and_increments() {
297 let t = IndicatifProgress::new();
298 t.register(Table::Lineitem, 60);
299 t.register(Table::Orders, 15);
300 t.increment(Table::Lineitem, 1);
301 t.increment(Table::Orders, 5);
302
303 let tables = t.tables.read().unwrap();
304 assert_eq!(tables[&Table::Lineitem].position(), 1);
305 assert_eq!(tables[&Table::Orders].position(), 5);
306 }
307
308 #[test]
309 fn reaches_total() {
310 let t = IndicatifProgress::new();
311 t.register(Table::Orders, 5);
312 for _ in 0..5 {
313 t.increment(Table::Orders, 1);
314 }
315 assert_eq!(t.tables.read().unwrap()[&Table::Orders].position(), 5);
316 }
317
318 #[test]
319 fn unknown_table_is_no_op() {
320 // Incrementing a table not registered must not panic.
321 let t = IndicatifProgress::new();
322 t.register(Table::Orders, 1);
323 t.increment(Table::Lineitem, 1);
324 assert_eq!(t.tables.read().unwrap()[&Table::Orders].position(), 0);
325 }
326
327 #[test]
328 fn finish_marks_registered_bars_finished() {
329 let t = IndicatifProgress::new();
330 t.register(Table::Orders, 2);
331 t.increment(Table::Orders, 2);
332 t.finish();
333
334 assert!(t.tables.read().unwrap()[&Table::Orders].is_finished());
335 }
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use crate::tpch_cli::Table;
343 use std::sync::{
344 atomic::{AtomicU64, Ordering},
345 Arc, Mutex,
346 };
347
348 /// Mock implementation that records every event. Demonstrates that
349 /// the trait is dyn-compatible and usable from external code without
350 /// pulling in `indicatif`.
351 #[derive(Debug, Default)]
352 struct MockTracker {
353 registered: Mutex<Vec<(Table, u64)>>,
354 total_increments: AtomicU64,
355 finished: AtomicU64,
356 }
357
358 impl ProgressTracker for MockTracker {
359 fn register(&self, table: Table, total_units: u64) {
360 self.registered.lock().unwrap().push((table, total_units));
361 }
362 fn increment(&self, _table: Table, units: u64) {
363 self.total_increments.fetch_add(units, Ordering::Relaxed);
364 }
365 fn finish(&self) {
366 self.finished.fetch_add(1, Ordering::Relaxed);
367 }
368 }
369
370 #[test]
371 fn mock_tracker_works_through_arc_dyn() {
372 let mock = Arc::new(MockTracker::default());
373 let dynamic: Arc<dyn ProgressTracker> = mock.clone();
374 dynamic.register(Table::Lineitem, 10);
375 dynamic.register(Table::Orders, 4);
376 dynamic.increment(Table::Lineitem, 3);
377 dynamic.increment(Table::Orders, 1);
378 dynamic.finish();
379
380 assert_eq!(
381 *mock.registered.lock().unwrap(),
382 vec![(Table::Lineitem, 10), (Table::Orders, 4)]
383 );
384 assert_eq!(mock.total_increments.load(Ordering::Relaxed), 4);
385 assert_eq!(mock.finished.load(Ordering::Relaxed), 1);
386 }
387
388 #[test]
389 fn default_register_and_finish_are_noops() {
390 // An impl that only overrides `increment` should compile and run.
391 #[derive(Debug)]
392 struct Minimal(AtomicU64);
393 impl ProgressTracker for Minimal {
394 fn increment(&self, _t: Table, c: u64) {
395 self.0.fetch_add(c, Ordering::Relaxed);
396 }
397 }
398 let m = Minimal(AtomicU64::new(0));
399 m.register(Table::Region, 99); // no-op default
400 m.increment(Table::Region, 7);
401 m.finish(); // no-op default
402 assert_eq!(m.0.load(Ordering::Relaxed), 7);
403 }
404}