Skip to main content

yt_dlp/stats/
tracker.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use tokio::sync::RwLock;
5use tokio::sync::broadcast::error::RecvError;
6use tokio::task::JoinHandle;
7
8use super::config::TrackerConfig;
9use super::inner::{CompletedDownload, DownloadOutcome, InProgressDownload, StatsInner};
10use super::snapshot::{
11    ActiveDownloadSnapshot, DownloadOutcomeSnapshot, DownloadSnapshot, DownloadStats, FetchStats, GlobalSnapshot,
12    PlaylistStats, PostProcessStats,
13};
14use crate::download::DownloadPriority;
15use crate::events::{DownloadEvent, EventBus};
16
17/// Subscribes to the event bus and maintains running statistics about all download
18/// and metadata fetch operations.
19///
20/// The tracker runs a background task that processes events from the [`EventBus`] and
21/// updates its internal counters. Use [`snapshot`](StatisticsTracker::snapshot) to
22/// retrieve a point-in-time view of the collected data.
23///
24/// # Examples
25///
26/// ```rust,no_run
27/// # use yt_dlp::Downloader;
28/// # use yt_dlp::client::deps::Libraries;
29/// # use std::path::PathBuf;
30/// # #[tokio::main]
31/// # async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
32/// let libraries = Libraries::new(PathBuf::from("libs/yt-dlp"), PathBuf::from("libs/ffmpeg"));
33/// let downloader = Downloader::builder(libraries, "output").build().await?;
34///
35/// // ... perform downloads ...
36///
37/// let snapshot = downloader.statistics().snapshot().await;
38/// println!("Completed: {}", snapshot.downloads.completed);
39/// println!("Total bytes: {}", snapshot.downloads.total_bytes);
40/// # Ok(())
41/// # }
42/// ```
43pub struct StatisticsTracker {
44    inner: Arc<RwLock<StatsInner>>,
45    // Kept alive to ensure the background task runs for as long as the tracker lives.
46    _task: JoinHandle<()>,
47}
48
49impl StatisticsTracker {
50    /// Creates a tracker with default configuration and subscribes to `bus`.
51    ///
52    /// # Arguments
53    ///
54    /// * `bus` - The event bus to subscribe to.
55    ///
56    /// # Returns
57    ///
58    /// A new [`StatisticsTracker`] that is already running its background event loop.
59    pub fn new(bus: &EventBus) -> Self {
60        Self::with_config(bus, TrackerConfig::default())
61    }
62
63    /// Creates a tracker with custom configuration and subscribes to `bus`.
64    ///
65    /// # Arguments
66    ///
67    /// * `bus` - The event bus to subscribe to.
68    /// * `config` - History-size bounds and other settings.
69    ///
70    /// # Returns
71    ///
72    /// A new [`StatisticsTracker`] that is already running its background event loop.
73    pub fn with_config(bus: &EventBus, config: TrackerConfig) -> Self {
74        tracing::debug!(
75            max_history = config.max_download_history,
76            "📊 Creating statistics tracker"
77        );
78
79        let inner = Arc::new(RwLock::new(StatsInner::new(config)));
80        let rx = bus.subscribe();
81        let inner_clone = inner.clone();
82
83        let task = tokio::spawn(run_event_loop(inner_clone, rx));
84
85        Self { inner, _task: task }
86    }
87
88    /// Returns a point-in-time snapshot of all collected statistics.
89    ///
90    /// Acquires a read lock on the internal state, computes derived metrics (averages,
91    /// rates), and returns a fully-owned [`GlobalSnapshot`]. Subsequent mutations to the
92    /// tracker are not reflected in an already-obtained snapshot.
93    ///
94    /// # Returns
95    ///
96    /// A [`GlobalSnapshot`] containing aggregate statistics for downloads, fetches,
97    /// post-processing, and playlists at the time of the call.
98    pub async fn snapshot(&self) -> GlobalSnapshot {
99        let inner = self.inner.read().await;
100        build_snapshot(&inner)
101    }
102
103    /// Returns the number of downloads currently in progress.
104    ///
105    /// # Returns
106    ///
107    /// The count of downloads that have been started but have not yet reached a terminal
108    /// state (completed, failed, or canceled).
109    pub async fn active_count(&self) -> usize {
110        self.inner.read().await.in_progress.len()
111    }
112
113    /// Returns the total number of successfully completed downloads.
114    ///
115    /// # Returns
116    ///
117    /// Cumulative count of downloads that ended with a [`DownloadCompleted`](crate::events::DownloadEvent::DownloadCompleted) event.
118    pub async fn completed_count(&self) -> u64 {
119        self.inner.read().await.completed
120    }
121
122    /// Returns the total number of bytes transferred across all completed downloads.
123    ///
124    /// # Returns
125    ///
126    /// Sum of `total_bytes` from every [`DownloadCompleted`](crate::events::DownloadEvent::DownloadCompleted) event received.
127    pub async fn total_bytes(&self) -> u64 {
128        self.inner.read().await.total_bytes
129    }
130
131    /// Resets all counters and history to their initial state, preserving the configuration.
132    ///
133    /// This is useful for implementing rolling windows or clearing statistics between
134    /// logical phases of an application. The tracker continues running and will start
135    /// collecting fresh data immediately after the reset.
136    pub async fn reset(&self) {
137        tracing::debug!("📊 Resetting statistics tracker");
138
139        let mut inner = self.inner.write().await;
140        let config = inner.config;
141        *inner = StatsInner::new(config);
142    }
143}
144
145impl std::fmt::Debug for StatisticsTracker {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.debug_struct("StatisticsTracker").finish_non_exhaustive()
148    }
149}
150
151impl std::fmt::Display for StatisticsTracker {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.write_str("StatisticsTracker")
154    }
155}
156
157/// Background loop: receives events from the broadcast channel and updates `inner`.
158async fn run_event_loop(inner: Arc<RwLock<StatsInner>>, mut rx: tokio::sync::broadcast::Receiver<Arc<DownloadEvent>>) {
159    loop {
160        match rx.recv().await {
161            Ok(event) => {
162                let mut state = inner.write().await;
163                handle_event(&mut state, &event);
164            }
165            Err(RecvError::Lagged(missed)) => {
166                tracing::warn!(missed = missed, "Statistics tracker lagged, some events were missed");
167            }
168            Err(RecvError::Closed) => break,
169        }
170    }
171
172    tracing::debug!("📊 Statistics tracker event loop terminated");
173}
174
175/// Resolved fields extracted from an in-progress download record.
176///
177/// Produced by [`resolve_in_progress_record`] to avoid repeating the same
178/// `match record { Some(r) => ..., None => ... }` pattern in every terminal arm.
179struct ResolvedRecord {
180    url: String,
181    priority: DownloadPriority,
182    queue_wait: Option<Duration>,
183    peak_speed: f64,
184    elapsed: Option<Duration>,
185}
186
187/// Extracts timing and identity fields from an optional in-progress record.
188///
189/// Returns a fully-populated [`ResolvedRecord`] with zero-valued defaults when
190/// `record` is `None` (i.e. the download was not tracked — e.g. it was queued
191/// before the tracker started).
192///
193/// # Arguments
194///
195/// * `record` - The removed in-progress entry, or `None` if not found.
196///
197/// # Returns
198///
199/// A [`ResolvedRecord`] with `url`, `priority`, `queue_wait`, `peak_speed`,
200/// and `elapsed` populated from the record, or defaulted to empty/`None`/`0.0`.
201fn resolve_in_progress_record(record: Option<InProgressDownload>) -> ResolvedRecord {
202    match record {
203        Some(r) => ResolvedRecord {
204            queue_wait: r.started_at.map(|s| s.duration_since(r.queued_at)),
205            elapsed: r.started_at.map(|s| s.elapsed()),
206            peak_speed: r.peak_speed,
207            url: r.url,
208            priority: r.priority,
209        },
210        None => ResolvedRecord {
211            url: String::new(),
212            priority: DownloadPriority::Normal,
213            queue_wait: None,
214            peak_speed: 0.0,
215            elapsed: None,
216        },
217    }
218}
219
220/// Applies a single event to the mutable state. No I/O or `.await` inside.
221fn handle_event(state: &mut StatsInner, event: &DownloadEvent) {
222    match event {
223        DownloadEvent::DownloadQueued {
224            download_id,
225            url,
226            priority,
227            ..
228        } => {
229            state.attempted += 1;
230            state.queued += 1;
231            state.in_progress.insert(
232                *download_id,
233                InProgressDownload {
234                    url: url.clone(),
235                    priority: *priority,
236                    queued_at: Instant::now(),
237                    started_at: None,
238                    peak_speed: 0.0,
239                    downloaded_bytes: 0,
240                    total_bytes: 0,
241                },
242            );
243
244            tracing::debug!(download_id = download_id, url = url, "📊 Download queued");
245        }
246
247        DownloadEvent::DownloadStarted {
248            download_id,
249            total_bytes,
250            ..
251        } => {
252            state.queued = state.queued.saturating_sub(1);
253
254            if let Some(entry) = state.in_progress.get_mut(download_id) {
255                entry.started_at = Some(Instant::now());
256                entry.total_bytes = *total_bytes;
257            }
258
259            tracing::debug!(
260                download_id = download_id,
261                total_bytes = total_bytes,
262                "📊 Download started"
263            );
264        }
265
266        DownloadEvent::DownloadProgress {
267            download_id,
268            downloaded_bytes,
269            total_bytes,
270            speed_bytes_per_sec,
271            ..
272        } => {
273            if let Some(entry) = state.in_progress.get_mut(download_id) {
274                entry.downloaded_bytes = *downloaded_bytes;
275                entry.total_bytes = *total_bytes;
276                if *speed_bytes_per_sec > entry.peak_speed {
277                    entry.peak_speed = *speed_bytes_per_sec;
278                }
279            }
280        }
281
282        DownloadEvent::DownloadCompleted {
283            download_id,
284            duration,
285            total_bytes,
286            ..
287        } => {
288            state.completed += 1;
289            state.total_bytes += total_bytes;
290            state.total_download_duration += *duration;
291
292            let rec = resolve_in_progress_record(state.in_progress.remove(download_id));
293
294            state.push_history(CompletedDownload {
295                download_id: *download_id,
296                url: rec.url,
297                priority: rec.priority,
298                outcome: DownloadOutcome::Completed,
299                bytes: *total_bytes,
300                duration: Some(*duration),
301                queue_wait: rec.queue_wait,
302                peak_speed: rec.peak_speed,
303                retry_count: 0,
304            });
305
306            tracing::debug!(
307                download_id = download_id,
308                total_bytes = total_bytes,
309                duration = ?duration,
310                "📊 Download completed"
311            );
312        }
313
314        DownloadEvent::DownloadFailed {
315            download_id,
316            retry_count,
317            ..
318        } => {
319            state.failed += 1;
320            state.total_retries += *retry_count as u64;
321
322            let rec = resolve_in_progress_record(state.in_progress.remove(download_id));
323
324            state.push_history(CompletedDownload {
325                download_id: *download_id,
326                url: rec.url,
327                priority: rec.priority,
328                outcome: DownloadOutcome::Failed,
329                bytes: 0,
330                duration: rec.elapsed,
331                queue_wait: rec.queue_wait,
332                peak_speed: rec.peak_speed,
333                retry_count: *retry_count,
334            });
335
336            tracing::debug!(
337                download_id = download_id,
338                retry_count = retry_count,
339                "📊 Download failed"
340            );
341        }
342
343        DownloadEvent::DownloadCanceled { download_id, .. } => {
344            state.canceled += 1;
345            if state.queued > 0 {
346                state.queued -= 1;
347            }
348
349            let rec = resolve_in_progress_record(state.in_progress.remove(download_id));
350
351            state.push_history(CompletedDownload {
352                download_id: *download_id,
353                url: rec.url,
354                priority: rec.priority,
355                outcome: DownloadOutcome::Canceled,
356                bytes: 0,
357                duration: None,
358                queue_wait: rec.queue_wait,
359                peak_speed: 0.0,
360                retry_count: 0,
361            });
362
363            tracing::debug!(download_id = download_id, "📊 Download canceled");
364        }
365
366        DownloadEvent::VideoFetched { url, duration, .. } => {
367            state.fetch_attempted += 1;
368            state.fetch_succeeded += 1;
369            state.total_fetch_duration += *duration;
370
371            tracing::debug!(
372                url = url,
373                duration = ?duration,
374                "📊 Video fetched"
375            );
376        }
377
378        DownloadEvent::VideoFetchFailed { url, duration, .. } => {
379            state.fetch_attempted += 1;
380            state.fetch_failed += 1;
381
382            tracing::debug!(
383                url = url,
384                duration = ?duration,
385                "📊 Video fetch failed"
386            );
387        }
388
389        DownloadEvent::PlaylistFetched {
390            url,
391            duration,
392            playlist,
393        } => {
394            state.fetch_attempted += 1;
395            state.fetch_succeeded += 1;
396            state.total_fetch_duration += *duration;
397            state.playlists_fetched += 1;
398
399            tracing::debug!(
400                url = url,
401                duration = ?duration,
402                playlist_id = %playlist.id,
403                "📊 Playlist fetched"
404            );
405        }
406
407        DownloadEvent::PlaylistFetchFailed { url, duration, .. } => {
408            state.fetch_attempted += 1;
409            state.fetch_failed += 1;
410            state.playlist_fetch_failed += 1;
411
412            tracing::debug!(
413                url = url,
414                duration = ?duration,
415                "📊 Playlist fetch failed"
416            );
417        }
418
419        DownloadEvent::PlaylistCompleted { successful, failed, .. } => {
420            state.playlist_items_successful += *successful as u64;
421            state.playlist_items_failed += *failed as u64;
422
423            tracing::debug!(successful = successful, failed = failed, "📊 Playlist completed");
424        }
425
426        DownloadEvent::PostProcessStarted { operation, .. } => {
427            state.postprocess_attempted += 1;
428
429            tracing::debug!(
430                operation = ?operation,
431                "📊 Post-process started"
432            );
433        }
434
435        DownloadEvent::PostProcessCompleted {
436            operation, duration, ..
437        } => {
438            state.postprocess_succeeded += 1;
439            state.total_postprocess_duration += *duration;
440
441            tracing::debug!(
442                operation = ?operation,
443                duration = ?duration,
444                "📊 Post-process completed"
445            );
446        }
447
448        DownloadEvent::PostProcessFailed { operation, error, .. } => {
449            state.postprocess_failed += 1;
450
451            tracing::debug!(
452                operation = ?operation,
453                error = error,
454                "📊 Post-process failed"
455            );
456        }
457
458        DownloadEvent::SegmentStarted { .. }
459        | DownloadEvent::SegmentCompleted { .. }
460        | DownloadEvent::FormatSelected { .. }
461        | DownloadEvent::MetadataApplied { .. }
462        | DownloadEvent::ChaptersEmbedded { .. }
463        | DownloadEvent::DownloadPaused { .. }
464        | DownloadEvent::DownloadResumed { .. }
465        | DownloadEvent::PlaylistItemStarted { .. }
466        | DownloadEvent::PlaylistItemCompleted { .. }
467        | DownloadEvent::PlaylistItemFailed { .. } => {
468            tracing::debug!(event = ?event, "📊 Untracked event, ignoring");
469        }
470
471        #[cfg(feature = "live-recording")]
472        DownloadEvent::LiveRecordingStarted { .. }
473        | DownloadEvent::LiveRecordingProgress { .. }
474        | DownloadEvent::LiveRecordingStopped { .. }
475        | DownloadEvent::LiveRecordingFailed { .. } => {
476            tracing::debug!(event = ?event, "📊 Live recording event, ignoring in stats");
477        }
478        #[cfg(feature = "live-streaming")]
479        DownloadEvent::LiveStreamStarted { .. }
480        | DownloadEvent::LiveStreamProgress { .. }
481        | DownloadEvent::LiveStreamStopped { .. }
482        | DownloadEvent::LiveStreamFailed { .. } => {
483            tracing::debug!(event = ?event, "📊 Live stream event, ignoring in stats");
484        }
485    }
486}
487
488/// Constructs a [`GlobalSnapshot`] from the current state. Called under a read lock.
489fn build_snapshot(state: &StatsInner) -> GlobalSnapshot {
490    let now = Instant::now();
491    let terminal = state.completed + state.failed + state.canceled;
492    let download_success_rate = if terminal > 0 {
493        Some(state.completed as f64 / terminal as f64)
494    } else {
495        None
496    };
497
498    let fetch_success_rate = if state.fetch_attempted > 0 {
499        Some(state.fetch_succeeded as f64 / state.fetch_attempted as f64)
500    } else {
501        None
502    };
503
504    let postprocess_success_rate = if state.postprocess_attempted > 0 {
505        Some(state.postprocess_succeeded as f64 / state.postprocess_attempted as f64)
506    } else {
507        None
508    };
509
510    let postprocess_avg_duration = if state.postprocess_succeeded > 0 {
511        Some(Duration::from_secs_f64(
512            state.total_postprocess_duration.as_secs_f64() / state.postprocess_succeeded as f64,
513        ))
514    } else {
515        None
516    };
517
518    let playlist_items_total = state.playlist_items_successful + state.playlist_items_failed;
519    let item_success_rate = if playlist_items_total > 0 {
520        Some(state.playlist_items_successful as f64 / playlist_items_total as f64)
521    } else {
522        None
523    };
524
525    let mut active_downloads: Vec<ActiveDownloadSnapshot> = state
526        .in_progress
527        .iter()
528        .map(|(id, r)| {
529            let progress = if r.total_bytes > 0 {
530                Some(r.downloaded_bytes as f64 / r.total_bytes as f64)
531            } else {
532                None
533            };
534            ActiveDownloadSnapshot {
535                download_id: *id,
536                url: r.url.clone(),
537                priority: r.priority,
538                downloaded_bytes: r.downloaded_bytes,
539                total_bytes: r.total_bytes,
540                progress,
541                peak_speed_bytes_per_sec: r.peak_speed,
542                elapsed: r.started_at.map(|s| now.duration_since(s)),
543                time_since_queued: now.duration_since(r.queued_at),
544            }
545        })
546        .collect();
547    active_downloads.sort_by_key(|e| e.download_id);
548
549    let recent_downloads: Vec<DownloadSnapshot> = state
550        .history
551        .iter()
552        .map(|r| DownloadSnapshot {
553            download_id: r.download_id,
554            url: r.url.clone(),
555            priority: r.priority,
556            outcome: match r.outcome {
557                DownloadOutcome::Completed => DownloadOutcomeSnapshot::Completed,
558                DownloadOutcome::Failed => DownloadOutcomeSnapshot::Failed,
559                DownloadOutcome::Canceled => DownloadOutcomeSnapshot::Canceled,
560            },
561            bytes: r.bytes,
562            duration: r.duration,
563            queue_wait: r.queue_wait,
564            peak_speed_bytes_per_sec: r.peak_speed,
565            retry_count: r.retry_count,
566        })
567        .collect();
568
569    GlobalSnapshot {
570        downloads: DownloadStats {
571            attempted: state.attempted,
572            completed: state.completed,
573            failed: state.failed,
574            canceled: state.canceled,
575            queued: state.queued,
576            total_bytes: state.total_bytes,
577            total_retries: state.total_retries,
578            total_duration: state.total_download_duration,
579            avg_duration: state.avg_download_duration(),
580            avg_speed_bytes_per_sec: state.avg_speed_bytes_per_sec(),
581            peak_speed_bytes_per_sec: state.peak_speed_bytes_per_sec(),
582            success_rate: download_success_rate,
583        },
584        fetches: FetchStats {
585            attempted: state.fetch_attempted,
586            succeeded: state.fetch_succeeded,
587            failed: state.fetch_failed,
588            avg_duration: state.avg_fetch_duration(),
589            success_rate: fetch_success_rate,
590        },
591        post_processing: PostProcessStats {
592            attempted: state.postprocess_attempted,
593            succeeded: state.postprocess_succeeded,
594            failed: state.postprocess_failed,
595            avg_duration: postprocess_avg_duration,
596            success_rate: postprocess_success_rate,
597        },
598        playlists: PlaylistStats {
599            playlists_fetched: state.playlists_fetched,
600            playlists_fetch_failed: state.playlist_fetch_failed,
601            items_successful: state.playlist_items_successful,
602            items_failed: state.playlist_items_failed,
603            item_success_rate,
604        },
605        active_count: active_downloads.len(),
606        active_downloads,
607        recent_downloads,
608    }
609}