1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
//! Smart progress tracking for batch indexing operations.
//!
//! Provides three-phase progress display:
//! 1. Pre-scan: Count files and estimate chunks
//! 2. Calibration: Measure embedding speed on first file
//! 3. Indexing: Progress bar with ETA based on calibration
//!
//! This module is only available when the `cli` feature is enabled.
use indicatif::{ProgressBar, ProgressStyle};
use std::path::PathBuf;
use std::time::Instant;
/// Format bytes into human-readable string
fn format_bytes(bytes: u64) -> String {
const KB: u64 = 1024;
const MB: u64 = KB * 1024;
const GB: u64 = MB * 1024;
if bytes >= GB {
format!("{:.1} GB", bytes as f64 / GB as f64)
} else if bytes >= MB {
format!("{:.1} MB", bytes as f64 / MB as f64)
} else if bytes >= KB {
format!("{:.1} KB", bytes as f64 / KB as f64)
} else {
format!("{} B", bytes)
}
}
/// Progress tracker for batch indexing operations.
///
/// Implements a three-phase progress display:
/// 1. Pre-scan phase: counts files and estimates total chunks
/// 2. Calibration phase: measures embedding speed on first file
/// 3. Indexing phase: displays progress bar with ETA
pub struct IndexProgressTracker {
// Pre-scan phase
/// Total number of files to process
pub total_files: usize,
/// Total size of all files in bytes
pub total_bytes: u64,
/// Average chunk size in characters (used for estimation)
pub chunk_size: usize,
/// Estimated total chunks based on file sizes
pub estimated_chunks: usize,
// Calibration phase
/// When calibration started
calibration_start: Option<Instant>,
/// Measured chunks per second (EMA - updated dynamically)
pub chunks_per_sec: Option<f64>,
/// Name of the embedder model (for display)
pub embedder_model: Option<String>,
/// Whether calibration is complete
calibration_done: bool,
// Dynamic speed tracking (rolling average)
/// Last time we updated the speed measurement
last_speed_update: Option<Instant>,
/// Chunks processed since last speed update
chunks_since_update: usize,
/// EMA smoothing factor (0.3 = 30% new, 70% old)
speed_ema_alpha: f64,
// Progress tracking
/// Number of chunks processed so far
pub processed_chunks: usize,
/// Number of files processed (indexed + skipped + failed)
pub processed_files: usize,
/// Number of files skipped (duplicates)
pub skipped_files: usize,
/// Number of files that failed to index
pub failed_files: usize,
/// Number of files successfully indexed
pub indexed_files: usize,
// Progress bar
progress_bar: Option<ProgressBar>,
}
impl IndexProgressTracker {
/// Create a new progress tracker by pre-scanning the given paths.
///
/// This performs Phase 1: Pre-scan to count files and estimate chunks.
pub fn pre_scan(paths: &[PathBuf]) -> Self {
let total_bytes: u64 = paths
.iter()
.filter_map(|p| std::fs::metadata(p).ok())
.map(|m| m.len())
.sum();
// Rough estimate: 1 chunk per 500 chars, avg 1 byte = 1 char for text files
// This is conservative - actual chunk count depends on slice mode
let chunk_size = 500;
let estimated_chunks = (total_bytes as usize) / chunk_size;
Self {
total_files: paths.len(),
total_bytes,
chunk_size,
estimated_chunks,
calibration_start: None,
chunks_per_sec: None,
embedder_model: None,
calibration_done: false,
last_speed_update: None,
chunks_since_update: 0,
speed_ema_alpha: 0.3, // 30% weight for new measurements
processed_chunks: 0,
processed_files: 0,
skipped_files: 0,
failed_files: 0,
indexed_files: 0,
progress_bar: None,
}
}
/// Display Phase 1 pre-scan summary to stderr.
pub fn display_pre_scan(&self) {
eprintln!();
eprintln!("Phase 1: Pre-scan");
eprintln!(" |-- Files: {}", self.total_files);
eprintln!(" |-- Total size: {}", format_bytes(self.total_bytes));
eprintln!(
" `-- Est. chunks: ~{} (@ {} chars/chunk)",
self.estimated_chunks, self.chunk_size
);
}
/// Start the calibration phase (Phase 2).
///
/// Call this before processing the first file.
pub fn start_calibration(&mut self) {
self.calibration_start = Some(Instant::now());
eprintln!();
eprintln!("Phase 2: Calibration (first file)...");
}
/// Finish calibration with results from first file.
///
/// # Arguments
/// * `chunks_processed` - Number of chunks indexed in first file
/// * `model` - Name of the embedder model used
pub fn finish_calibration(&mut self, chunks_processed: usize, model: &str) {
if let Some(start) = self.calibration_start {
let elapsed = start.elapsed();
if elapsed.as_secs_f64() > 0.0 && chunks_processed > 0 {
self.chunks_per_sec = Some(chunks_processed as f64 / elapsed.as_secs_f64());
}
self.embedder_model = Some(model.to_string());
self.calibration_done = true;
// Initialize dynamic speed tracking
self.last_speed_update = Some(Instant::now());
self.chunks_since_update = 0;
eprintln!(
" `-- Speed: {:.1} chunks/sec ({}) [dynamic]",
self.chunks_per_sec.unwrap_or(0.0),
model
);
}
}
/// Check if calibration has been completed.
pub fn is_calibrated(&self) -> bool {
self.calibration_done
}
/// Start the progress bar for Phase 3: Indexing.
///
/// Creates a progress bar based on estimated chunks.
pub fn start_progress_bar(&mut self) {
let pb = ProgressBar::new(self.estimated_chunks as u64);
pb.set_style(
ProgressStyle::default_bar()
.template(
"{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} chunks | ETA: {eta} | {msg}",
)
.expect("Invalid progress bar template")
.progress_chars("#>-"),
);
eprintln!();
eprintln!("Phase 3: Indexing...");
self.progress_bar = Some(pb);
}
/// Increment the chunk counter and update progress bar.
///
/// Also updates the rolling average speed (EMA) every 2 seconds
/// to reflect actual embedding performance after GPU warm-up.
pub fn inc_chunks(&mut self, count: usize) {
self.processed_chunks += count;
self.chunks_since_update += count;
// Update speed every 2 seconds using EMA
if let Some(last_update) = self.last_speed_update {
let elapsed = last_update.elapsed().as_secs_f64();
if elapsed >= 2.0 && self.chunks_since_update > 0 {
let current_speed = self.chunks_since_update as f64 / elapsed;
// Exponential Moving Average: new = alpha * current + (1-alpha) * old
self.chunks_per_sec = Some(match self.chunks_per_sec {
Some(old_speed) => {
self.speed_ema_alpha * current_speed
+ (1.0 - self.speed_ema_alpha) * old_speed
}
None => current_speed,
});
// Reset for next measurement window
self.last_speed_update = Some(Instant::now());
self.chunks_since_update = 0;
}
}
if let Some(ref pb) = self.progress_bar {
pb.set_position(self.processed_chunks as u64);
}
}
/// Record a successfully indexed file.
///
/// # Arguments
/// * `chunks` - Number of chunks created from this file
pub fn file_indexed(&mut self, chunks: usize) {
self.indexed_files += 1;
self.processed_files += 1;
self.inc_chunks(chunks);
}
/// Record a skipped file (duplicate).
pub fn file_skipped(&mut self) {
self.skipped_files += 1;
self.processed_files += 1;
}
/// Record a failed file.
pub fn file_failed(&mut self) {
self.failed_files += 1;
self.processed_files += 1;
}
/// Set the current message on the progress bar.
pub fn set_message(&mut self, msg: &str) {
if let Some(ref pb) = self.progress_bar {
pb.set_message(msg.to_string());
}
}
/// Finish the progress bar with completion message.
pub fn finish(&mut self) {
if let Some(ref pb) = self.progress_bar {
pb.finish_with_message("Complete");
}
}
/// Display final summary after indexing is complete.
pub fn display_summary(&self) {
eprintln!();
eprintln!("Indexing complete:");
eprintln!(" |-- Chunks indexed: {}", self.processed_chunks);
eprintln!(" |-- Files processed: {}", self.processed_files);
eprintln!(" | |-- Indexed: {}", self.indexed_files);
if self.skipped_files > 0 {
eprintln!(" | |-- Skipped (duplicate): {}", self.skipped_files);
}
if self.failed_files > 0 {
eprintln!(" | `-- Failed: {}", self.failed_files);
}
if let Some(speed) = self.chunks_per_sec {
eprintln!(" `-- Avg speed: {:.1} chunks/sec", speed);
}
}
/// Adjust estimated chunks based on actual chunk count from calibration file.
///
/// This improves ETA accuracy after calibration by using the actual
/// bytes-to-chunks ratio observed in the first file.
pub fn adjust_estimate(&mut self, file_bytes: u64, actual_chunks: usize) {
if file_bytes > 0 && actual_chunks > 0 {
// Calculate actual bytes per chunk ratio
let bytes_per_chunk = file_bytes as f64 / actual_chunks as f64;
// Remaining bytes to process
let remaining_bytes = self.total_bytes.saturating_sub(file_bytes);
// Estimate remaining chunks based on actual ratio
let remaining_chunks = (remaining_bytes as f64 / bytes_per_chunk) as usize;
// Update total estimate
self.estimated_chunks = actual_chunks + remaining_chunks;
// Update progress bar length if active
if let Some(ref pb) = self.progress_bar {
pb.set_length(self.estimated_chunks as u64);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::TempDir;
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(0), "0 B");
assert_eq!(format_bytes(500), "500 B");
assert_eq!(format_bytes(1024), "1.0 KB");
assert_eq!(format_bytes(1536), "1.5 KB");
assert_eq!(format_bytes(1024 * 1024), "1.0 MB");
assert_eq!(format_bytes(1024 * 1024 * 1024), "1.0 GB");
}
#[test]
fn test_pre_scan_empty() {
let tracker = IndexProgressTracker::pre_scan(&[]);
assert_eq!(tracker.total_files, 0);
assert_eq!(tracker.total_bytes, 0);
assert_eq!(tracker.estimated_chunks, 0);
}
#[test]
fn test_pre_scan_with_files() {
let temp = TempDir::new().unwrap();
let file1 = temp.path().join("file1.txt");
let file2 = temp.path().join("file2.txt");
// Create test files with known sizes
let mut f1 = std::fs::File::create(&file1).unwrap();
f1.write_all(&[b'a'; 1000]).unwrap();
let mut f2 = std::fs::File::create(&file2).unwrap();
f2.write_all(&[b'b'; 500]).unwrap();
let paths = vec![file1, file2];
let tracker = IndexProgressTracker::pre_scan(&paths);
assert_eq!(tracker.total_files, 2);
assert_eq!(tracker.total_bytes, 1500);
// 1500 bytes / 500 chars per chunk = 3 estimated chunks
assert_eq!(tracker.estimated_chunks, 3);
}
#[test]
fn test_file_tracking() {
let tracker_paths: Vec<PathBuf> = vec![];
let mut tracker = IndexProgressTracker::pre_scan(&tracker_paths);
// Simulate indexing some files
tracker.file_indexed(10);
tracker.file_indexed(5);
tracker.file_skipped();
tracker.file_failed();
assert_eq!(tracker.processed_files, 4);
assert_eq!(tracker.indexed_files, 2);
assert_eq!(tracker.skipped_files, 1);
assert_eq!(tracker.failed_files, 1);
assert_eq!(tracker.processed_chunks, 15);
}
#[test]
fn test_calibration_flow() {
let tracker_paths: Vec<PathBuf> = vec![];
let mut tracker = IndexProgressTracker::pre_scan(&tracker_paths);
assert!(!tracker.is_calibrated());
tracker.start_calibration();
// Simulate some processing time
std::thread::sleep(std::time::Duration::from_millis(10));
tracker.finish_calibration(100, "test-model");
assert!(tracker.is_calibrated());
assert!(tracker.chunks_per_sec.is_some());
assert_eq!(tracker.embedder_model, Some("test-model".to_string()));
}
#[test]
fn test_adjust_estimate() {
let tracker_paths: Vec<PathBuf> = vec![];
let mut tracker = IndexProgressTracker::pre_scan(&tracker_paths);
tracker.total_bytes = 10000;
tracker.estimated_chunks = 20;
// First file: 1000 bytes produced 5 chunks
// Ratio: 200 bytes per chunk
// Remaining: 9000 bytes -> 45 chunks
// Total: 5 + 45 = 50 chunks
tracker.adjust_estimate(1000, 5);
assert_eq!(tracker.estimated_chunks, 50);
}
}