ceres_core/progress.rs
1//! Progress reporting for harvest operations.
2//!
3//! This module provides a trait-based abstraction for reporting progress during
4//! harvest operations, enabling decoupled logging and UI updates.
5
6use std::time::Duration;
7
8use crate::{BatchHarvestSummary, SyncStats};
9
10/// Events emitted during harvesting operations.
11///
12/// These events provide fine-grained progress information that consumers
13/// can use for logging, UI updates, or metrics collection.
14#[derive(Debug, Clone)]
15pub enum HarvestEvent<'a> {
16 /// Batch harvest starting.
17 BatchStarted {
18 /// Total number of portals to harvest.
19 total_portals: usize,
20 },
21
22 /// Single portal harvest starting.
23 PortalStarted {
24 /// Zero-based index of the current portal.
25 portal_index: usize,
26 /// Total number of portals in batch.
27 total_portals: usize,
28 /// Portal name identifier.
29 portal_name: &'a str,
30 /// Portal URL.
31 portal_url: &'a str,
32 },
33
34 /// Found existing datasets in database for portal.
35 ExistingDatasetsFound {
36 /// Number of existing datasets.
37 count: usize,
38 },
39
40 /// Found datasets on the portal.
41 PortalDatasetsFound {
42 /// Number of datasets found.
43 count: usize,
44 },
45
46 /// Progress update during dataset processing.
47 DatasetProcessed {
48 /// Number of datasets processed so far.
49 current: usize,
50 /// Total number of datasets to process.
51 total: usize,
52 /// Counts by outcome type.
53 created: usize,
54 /// Number of updated datasets.
55 updated: usize,
56 /// Number of unchanged datasets.
57 unchanged: usize,
58 /// Number of failed datasets.
59 failed: usize,
60 /// Number of datasets skipped due to circuit breaker.
61 skipped: usize,
62 },
63
64 /// Single portal harvest completed successfully.
65 PortalCompleted {
66 /// Zero-based index of the current portal.
67 portal_index: usize,
68 /// Total number of portals in batch.
69 total_portals: usize,
70 /// Portal name identifier.
71 portal_name: &'a str,
72 /// Final statistics.
73 stats: &'a SyncStats,
74 },
75
76 /// Single portal harvest failed.
77 PortalFailed {
78 /// Zero-based index of the current portal.
79 portal_index: usize,
80 /// Total number of portals in batch.
81 total_portals: usize,
82 /// Portal name identifier.
83 portal_name: &'a str,
84 /// Error description.
85 error: &'a str,
86 },
87
88 /// Batch harvest completed.
89 BatchCompleted {
90 /// Aggregated summary of all portal results.
91 summary: &'a BatchHarvestSummary,
92 },
93
94 /// Single portal harvest was cancelled.
95 PortalCancelled {
96 /// Zero-based index of the current portal.
97 portal_index: usize,
98 /// Total number of portals in batch.
99 total_portals: usize,
100 /// Portal name identifier.
101 portal_name: &'a str,
102 /// Partial statistics at cancellation time.
103 stats: &'a SyncStats,
104 },
105
106 /// Batch harvest was cancelled.
107 BatchCancelled {
108 /// Number of portals completed before cancellation.
109 completed_portals: usize,
110 /// Total number of portals in batch.
111 total_portals: usize,
112 },
113
114 /// Datasets marked as stale after a successful full sync.
115 StaleDetected {
116 /// Number of datasets newly marked as stale.
117 count: usize,
118 },
119
120 /// Circuit breaker is open, harvest pausing/failing.
121 CircuitBreakerOpen {
122 /// Service name.
123 service: &'a str,
124 /// Time until recovery attempt.
125 retry_after: Duration,
126 },
127
128 /// Pre-processing phase (delta detection / hash check) starting.
129 PreprocessingStarted {
130 /// Total number of datasets to pre-process.
131 total: usize,
132 },
133
134 /// Pre-processing phase completed — summary before finalization steps
135 /// (stale detection).
136 ///
137 /// Note: in the streaming pipeline, preprocessing and persistence are
138 /// interleaved, so this fires after both have completed.
139 PreprocessingCompleted {
140 /// Number of datasets that need persistence (created + updated).
141 changed: usize,
142 /// Number of datasets with identical content hash.
143 unchanged: usize,
144 /// Number of datasets that failed pre-processing.
145 failed: usize,
146 },
147}
148
149/// Trait for reporting harvest progress.
150///
151/// Implementors can provide CLI output, server event streams, metrics,
152/// or any other form of progress reporting.
153///
154/// The default implementation does nothing (silent mode), which is
155/// appropriate for library usage where the caller doesn't need progress updates.
156///
157/// # Example
158///
159/// ```
160/// use ceres_core::progress::{ProgressReporter, HarvestEvent};
161///
162/// struct MyReporter;
163///
164/// impl ProgressReporter for MyReporter {
165/// fn report(&self, event: HarvestEvent<'_>) {
166/// match event {
167/// HarvestEvent::PortalStarted { portal_name, .. } => {
168/// println!("Starting: {}", portal_name);
169/// }
170/// _ => {}
171/// }
172/// }
173/// }
174/// ```
175pub trait ProgressReporter: Send + Sync {
176 /// Called when a harvest event occurs.
177 ///
178 /// The default implementation does nothing (silent mode).
179 fn report(&self, event: HarvestEvent<'_>) {
180 // Default: do nothing (silent mode for library usage)
181 let _ = event;
182 }
183}
184
185/// A no-op reporter that ignores all events.
186///
187/// Use this when you don't need progress reporting (library mode).
188#[derive(Debug, Default, Clone, Copy)]
189pub struct SilentReporter;
190
191impl ProgressReporter for SilentReporter {}
192
193/// A reporter that logs events using the `tracing` crate.
194///
195/// This is suitable for CLI applications that want structured logging.
196#[derive(Debug, Default, Clone, Copy)]
197pub struct TracingReporter;
198
199impl ProgressReporter for TracingReporter {
200 fn report(&self, event: HarvestEvent<'_>) {
201 use tracing::{error, info};
202
203 match event {
204 HarvestEvent::BatchStarted { total_portals } => {
205 info!("Starting batch harvest of {} portal(s)", total_portals);
206 }
207 HarvestEvent::PortalStarted {
208 portal_index,
209 total_portals,
210 portal_name,
211 portal_url,
212 } => {
213 info!(
214 "[Portal {}/{}] {} ({})",
215 portal_index + 1,
216 total_portals,
217 portal_name,
218 portal_url
219 );
220 }
221 HarvestEvent::ExistingDatasetsFound { count } => {
222 info!("Found {} existing dataset(s) in database", count);
223 }
224 HarvestEvent::PortalDatasetsFound { count } => {
225 info!("Found {} dataset(s) on portal", count);
226 }
227 HarvestEvent::DatasetProcessed {
228 current,
229 total,
230 created,
231 updated,
232 unchanged,
233 failed,
234 skipped,
235 } => {
236 let pct = (current as f64 / total as f64 * 100.0) as u8;
237 if skipped > 0 {
238 info!(
239 "Progress: {}/{} ({}%) - {} new, {} updated, {} unchanged, {} failed, {} skipped",
240 current, total, pct, created, updated, unchanged, failed, skipped
241 );
242 } else {
243 info!(
244 "Progress: {}/{} ({}%) - {} new, {} updated, {} unchanged, {} failed",
245 current, total, pct, created, updated, unchanged, failed
246 );
247 }
248 }
249 HarvestEvent::PortalCompleted {
250 portal_index,
251 total_portals,
252 portal_name,
253 stats,
254 } => {
255 info!(
256 "[Portal {}/{}] {} completed: {} dataset(s) ({} created, {} updated, {} unchanged)",
257 portal_index + 1,
258 total_portals,
259 portal_name,
260 stats.total(),
261 stats.created,
262 stats.updated,
263 stats.unchanged
264 );
265 }
266 HarvestEvent::PortalFailed {
267 portal_index,
268 total_portals,
269 portal_name,
270 error,
271 } => {
272 error!(
273 "[Portal {}/{}] {} failed: {}",
274 portal_index + 1,
275 total_portals,
276 portal_name,
277 error
278 );
279 }
280 HarvestEvent::BatchCompleted { summary } => {
281 info!(
282 "Batch complete: {} portal(s), {} dataset(s) ({} successful, {} failed)",
283 summary.total_portals(),
284 summary.total_datasets(),
285 summary.successful_count(),
286 summary.failed_count()
287 );
288 }
289 HarvestEvent::PortalCancelled {
290 portal_index,
291 total_portals,
292 portal_name,
293 stats,
294 } => {
295 info!(
296 "[Portal {}/{}] {} cancelled: {} dataset(s) processed ({} created, {} updated, {} unchanged)",
297 portal_index + 1,
298 total_portals,
299 portal_name,
300 stats.total(),
301 stats.created,
302 stats.updated,
303 stats.unchanged
304 );
305 }
306 HarvestEvent::BatchCancelled {
307 completed_portals,
308 total_portals,
309 } => {
310 info!(
311 "Batch cancelled: {}/{} portal(s) completed before cancellation",
312 completed_portals, total_portals
313 );
314 }
315 HarvestEvent::StaleDetected { count } => {
316 use tracing::warn;
317 warn!(
318 "{} dataset(s) marked as stale (no longer found on portal)",
319 count
320 );
321 }
322 HarvestEvent::CircuitBreakerOpen {
323 service,
324 retry_after,
325 } => {
326 use tracing::warn;
327 warn!(
328 "Circuit breaker '{}' is open. Retry after {} seconds.",
329 service,
330 retry_after.as_secs()
331 );
332 }
333 HarvestEvent::PreprocessingStarted { total } => {
334 info!("Pre-processing {} dataset(s) (delta detection)...", total);
335 }
336 HarvestEvent::PreprocessingCompleted {
337 changed,
338 unchanged,
339 failed,
340 } => {
341 info!(
342 "Pre-processing complete: {} changed, {} unchanged, {} failed",
343 changed, unchanged, failed
344 );
345 }
346 }
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 #[test]
355 fn test_silent_reporter_does_nothing() {
356 let reporter = SilentReporter;
357 // Should not panic
358 reporter.report(HarvestEvent::BatchStarted { total_portals: 5 });
359 }
360
361 #[test]
362 fn test_tracing_reporter_handles_all_events() {
363 let reporter = TracingReporter;
364
365 // Test all event variants don't panic
366 reporter.report(HarvestEvent::BatchStarted { total_portals: 2 });
367 reporter.report(HarvestEvent::PortalStarted {
368 portal_index: 0,
369 total_portals: 2,
370 portal_name: "test",
371 portal_url: "https://example.com",
372 });
373 reporter.report(HarvestEvent::ExistingDatasetsFound { count: 10 });
374 reporter.report(HarvestEvent::PortalDatasetsFound { count: 20 });
375 reporter.report(HarvestEvent::DatasetProcessed {
376 current: 10,
377 total: 20,
378 created: 2,
379 updated: 3,
380 unchanged: 5,
381 failed: 0,
382 skipped: 0,
383 });
384
385 let stats = SyncStats {
386 unchanged: 5,
387 updated: 3,
388 created: 2,
389 failed: 0,
390 skipped: 0,
391 };
392 reporter.report(HarvestEvent::PortalCompleted {
393 portal_index: 0,
394 total_portals: 2,
395 portal_name: "test",
396 stats: &stats,
397 });
398 reporter.report(HarvestEvent::PortalFailed {
399 portal_index: 1,
400 total_portals: 2,
401 portal_name: "test2",
402 error: "connection failed",
403 });
404
405 let summary = BatchHarvestSummary::new();
406 reporter.report(HarvestEvent::BatchCompleted { summary: &summary });
407
408 // Test cancellation events
409 reporter.report(HarvestEvent::PortalCancelled {
410 portal_index: 0,
411 total_portals: 2,
412 portal_name: "test",
413 stats: &stats,
414 });
415 reporter.report(HarvestEvent::BatchCancelled {
416 completed_portals: 1,
417 total_portals: 3,
418 });
419
420 // Test stale detection events
421 reporter.report(HarvestEvent::StaleDetected { count: 5 });
422
423 // Test circuit breaker events
424 reporter.report(HarvestEvent::CircuitBreakerOpen {
425 service: "gemini",
426 retry_after: Duration::from_secs(30),
427 });
428
429 // Test preprocessing events
430 reporter.report(HarvestEvent::PreprocessingStarted { total: 100 });
431 reporter.report(HarvestEvent::PreprocessingCompleted {
432 changed: 10,
433 unchanged: 85,
434 failed: 5,
435 });
436 }
437
438 #[test]
439 fn test_default_implementations() {
440 let silent = SilentReporter;
441 silent.report(HarvestEvent::BatchStarted { total_portals: 1 });
442
443 let tracing = TracingReporter;
444 tracing.report(HarvestEvent::BatchStarted { total_portals: 1 });
445 }
446}