1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use tokio::sync::RwLock;
5use tokio::sync::broadcast::error::RecvError;
6use tokio::task::JoinHandle;
7
8use super::config::TrackerConfig;
9use super::inner::{CompletedDownload, DownloadOutcome, InProgressDownload, StatsInner};
10use super::snapshot::{
11 ActiveDownloadSnapshot, DownloadOutcomeSnapshot, DownloadSnapshot, DownloadStats, FetchStats, GlobalSnapshot,
12 PlaylistStats, PostProcessStats,
13};
14use crate::download::DownloadPriority;
15use crate::events::{DownloadEvent, EventBus};
16
17pub struct StatisticsTracker {
44 inner: Arc<RwLock<StatsInner>>,
45 _task: JoinHandle<()>,
47}
48
49impl StatisticsTracker {
50 pub fn new(bus: &EventBus) -> Self {
60 Self::with_config(bus, TrackerConfig::default())
61 }
62
63 pub fn with_config(bus: &EventBus, config: TrackerConfig) -> Self {
74 tracing::debug!(
75 max_history = config.max_download_history,
76 "📊 Creating statistics tracker"
77 );
78
79 let inner = Arc::new(RwLock::new(StatsInner::new(config)));
80 let rx = bus.subscribe();
81 let inner_clone = inner.clone();
82
83 let task = tokio::spawn(run_event_loop(inner_clone, rx));
84
85 Self { inner, _task: task }
86 }
87
88 pub async fn snapshot(&self) -> GlobalSnapshot {
99 let inner = self.inner.read().await;
100 build_snapshot(&inner)
101 }
102
103 pub async fn active_count(&self) -> usize {
110 self.inner.read().await.in_progress.len()
111 }
112
113 pub async fn completed_count(&self) -> u64 {
119 self.inner.read().await.completed
120 }
121
122 pub async fn total_bytes(&self) -> u64 {
128 self.inner.read().await.total_bytes
129 }
130
131 pub async fn reset(&self) {
137 tracing::debug!("📊 Resetting statistics tracker");
138
139 let mut inner = self.inner.write().await;
140 let config = inner.config;
141 *inner = StatsInner::new(config);
142 }
143}
144
145impl std::fmt::Debug for StatisticsTracker {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 f.debug_struct("StatisticsTracker").finish_non_exhaustive()
148 }
149}
150
151impl std::fmt::Display for StatisticsTracker {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.write_str("StatisticsTracker")
154 }
155}
156
157async fn run_event_loop(inner: Arc<RwLock<StatsInner>>, mut rx: tokio::sync::broadcast::Receiver<Arc<DownloadEvent>>) {
159 loop {
160 match rx.recv().await {
161 Ok(event) => {
162 let mut state = inner.write().await;
163 handle_event(&mut state, &event);
164 }
165 Err(RecvError::Lagged(missed)) => {
166 tracing::warn!(missed = missed, "Statistics tracker lagged, some events were missed");
167 }
168 Err(RecvError::Closed) => break,
169 }
170 }
171
172 tracing::debug!("📊 Statistics tracker event loop terminated");
173}
174
175struct ResolvedRecord {
180 url: String,
181 priority: DownloadPriority,
182 queue_wait: Option<Duration>,
183 peak_speed: f64,
184 elapsed: Option<Duration>,
185}
186
187fn resolve_in_progress_record(record: Option<InProgressDownload>) -> ResolvedRecord {
202 match record {
203 Some(r) => ResolvedRecord {
204 queue_wait: r.started_at.map(|s| s.duration_since(r.queued_at)),
205 elapsed: r.started_at.map(|s| s.elapsed()),
206 peak_speed: r.peak_speed,
207 url: r.url,
208 priority: r.priority,
209 },
210 None => ResolvedRecord {
211 url: String::new(),
212 priority: DownloadPriority::Normal,
213 queue_wait: None,
214 peak_speed: 0.0,
215 elapsed: None,
216 },
217 }
218}
219
220fn handle_event(state: &mut StatsInner, event: &DownloadEvent) {
222 match event {
223 DownloadEvent::DownloadQueued {
224 download_id,
225 url,
226 priority,
227 ..
228 } => {
229 state.attempted += 1;
230 state.queued += 1;
231 state.in_progress.insert(
232 *download_id,
233 InProgressDownload {
234 url: url.clone(),
235 priority: *priority,
236 queued_at: Instant::now(),
237 started_at: None,
238 peak_speed: 0.0,
239 downloaded_bytes: 0,
240 total_bytes: 0,
241 },
242 );
243
244 tracing::debug!(download_id = download_id, url = url, "📊 Download queued");
245 }
246
247 DownloadEvent::DownloadStarted {
248 download_id,
249 total_bytes,
250 ..
251 } => {
252 state.queued = state.queued.saturating_sub(1);
253
254 if let Some(entry) = state.in_progress.get_mut(download_id) {
255 entry.started_at = Some(Instant::now());
256 entry.total_bytes = *total_bytes;
257 }
258
259 tracing::debug!(
260 download_id = download_id,
261 total_bytes = total_bytes,
262 "📊 Download started"
263 );
264 }
265
266 DownloadEvent::DownloadProgress {
267 download_id,
268 downloaded_bytes,
269 total_bytes,
270 speed_bytes_per_sec,
271 ..
272 } => {
273 if let Some(entry) = state.in_progress.get_mut(download_id) {
274 entry.downloaded_bytes = *downloaded_bytes;
275 entry.total_bytes = *total_bytes;
276 if *speed_bytes_per_sec > entry.peak_speed {
277 entry.peak_speed = *speed_bytes_per_sec;
278 }
279 }
280 }
281
282 DownloadEvent::DownloadCompleted {
283 download_id,
284 duration,
285 total_bytes,
286 ..
287 } => {
288 state.completed += 1;
289 state.total_bytes += total_bytes;
290 state.total_download_duration += *duration;
291
292 let rec = resolve_in_progress_record(state.in_progress.remove(download_id));
293
294 state.push_history(CompletedDownload {
295 download_id: *download_id,
296 url: rec.url,
297 priority: rec.priority,
298 outcome: DownloadOutcome::Completed,
299 bytes: *total_bytes,
300 duration: Some(*duration),
301 queue_wait: rec.queue_wait,
302 peak_speed: rec.peak_speed,
303 retry_count: 0,
304 });
305
306 tracing::debug!(
307 download_id = download_id,
308 total_bytes = total_bytes,
309 duration = ?duration,
310 "📊 Download completed"
311 );
312 }
313
314 DownloadEvent::DownloadFailed {
315 download_id,
316 retry_count,
317 ..
318 } => {
319 state.failed += 1;
320 state.total_retries += *retry_count as u64;
321
322 let rec = resolve_in_progress_record(state.in_progress.remove(download_id));
323
324 state.push_history(CompletedDownload {
325 download_id: *download_id,
326 url: rec.url,
327 priority: rec.priority,
328 outcome: DownloadOutcome::Failed,
329 bytes: 0,
330 duration: rec.elapsed,
331 queue_wait: rec.queue_wait,
332 peak_speed: rec.peak_speed,
333 retry_count: *retry_count,
334 });
335
336 tracing::debug!(
337 download_id = download_id,
338 retry_count = retry_count,
339 "📊 Download failed"
340 );
341 }
342
343 DownloadEvent::DownloadCanceled { download_id, .. } => {
344 state.canceled += 1;
345 if state.queued > 0 {
346 state.queued -= 1;
347 }
348
349 let rec = resolve_in_progress_record(state.in_progress.remove(download_id));
350
351 state.push_history(CompletedDownload {
352 download_id: *download_id,
353 url: rec.url,
354 priority: rec.priority,
355 outcome: DownloadOutcome::Canceled,
356 bytes: 0,
357 duration: None,
358 queue_wait: rec.queue_wait,
359 peak_speed: 0.0,
360 retry_count: 0,
361 });
362
363 tracing::debug!(download_id = download_id, "📊 Download canceled");
364 }
365
366 DownloadEvent::VideoFetched { url, duration, .. } => {
367 state.fetch_attempted += 1;
368 state.fetch_succeeded += 1;
369 state.total_fetch_duration += *duration;
370
371 tracing::debug!(
372 url = url,
373 duration = ?duration,
374 "📊 Video fetched"
375 );
376 }
377
378 DownloadEvent::VideoFetchFailed { url, duration, .. } => {
379 state.fetch_attempted += 1;
380 state.fetch_failed += 1;
381
382 tracing::debug!(
383 url = url,
384 duration = ?duration,
385 "📊 Video fetch failed"
386 );
387 }
388
389 DownloadEvent::PlaylistFetched {
390 url,
391 duration,
392 playlist,
393 } => {
394 state.fetch_attempted += 1;
395 state.fetch_succeeded += 1;
396 state.total_fetch_duration += *duration;
397 state.playlists_fetched += 1;
398
399 tracing::debug!(
400 url = url,
401 duration = ?duration,
402 playlist_id = %playlist.id,
403 "📊 Playlist fetched"
404 );
405 }
406
407 DownloadEvent::PlaylistFetchFailed { url, duration, .. } => {
408 state.fetch_attempted += 1;
409 state.fetch_failed += 1;
410 state.playlist_fetch_failed += 1;
411
412 tracing::debug!(
413 url = url,
414 duration = ?duration,
415 "📊 Playlist fetch failed"
416 );
417 }
418
419 DownloadEvent::PlaylistCompleted { successful, failed, .. } => {
420 state.playlist_items_successful += *successful as u64;
421 state.playlist_items_failed += *failed as u64;
422
423 tracing::debug!(successful = successful, failed = failed, "📊 Playlist completed");
424 }
425
426 DownloadEvent::PostProcessStarted { operation, .. } => {
427 state.postprocess_attempted += 1;
428
429 tracing::debug!(
430 operation = ?operation,
431 "📊 Post-process started"
432 );
433 }
434
435 DownloadEvent::PostProcessCompleted {
436 operation, duration, ..
437 } => {
438 state.postprocess_succeeded += 1;
439 state.total_postprocess_duration += *duration;
440
441 tracing::debug!(
442 operation = ?operation,
443 duration = ?duration,
444 "📊 Post-process completed"
445 );
446 }
447
448 DownloadEvent::PostProcessFailed { operation, error, .. } => {
449 state.postprocess_failed += 1;
450
451 tracing::debug!(
452 operation = ?operation,
453 error = error,
454 "📊 Post-process failed"
455 );
456 }
457
458 DownloadEvent::SegmentStarted { .. }
459 | DownloadEvent::SegmentCompleted { .. }
460 | DownloadEvent::FormatSelected { .. }
461 | DownloadEvent::MetadataApplied { .. }
462 | DownloadEvent::ChaptersEmbedded { .. }
463 | DownloadEvent::DownloadPaused { .. }
464 | DownloadEvent::DownloadResumed { .. }
465 | DownloadEvent::PlaylistItemStarted { .. }
466 | DownloadEvent::PlaylistItemCompleted { .. }
467 | DownloadEvent::PlaylistItemFailed { .. } => {
468 tracing::debug!(event = ?event, "📊 Untracked event, ignoring");
469 }
470
471 #[cfg(feature = "live-recording")]
472 DownloadEvent::LiveRecordingStarted { .. }
473 | DownloadEvent::LiveRecordingProgress { .. }
474 | DownloadEvent::LiveRecordingStopped { .. }
475 | DownloadEvent::LiveRecordingFailed { .. } => {
476 tracing::debug!(event = ?event, "📊 Live recording event, ignoring in stats");
477 }
478 #[cfg(feature = "live-streaming")]
479 DownloadEvent::LiveStreamStarted { .. }
480 | DownloadEvent::LiveStreamProgress { .. }
481 | DownloadEvent::LiveStreamStopped { .. }
482 | DownloadEvent::LiveStreamFailed { .. } => {
483 tracing::debug!(event = ?event, "📊 Live stream event, ignoring in stats");
484 }
485 }
486}
487
488fn build_snapshot(state: &StatsInner) -> GlobalSnapshot {
490 let now = Instant::now();
491 let terminal = state.completed + state.failed + state.canceled;
492 let download_success_rate = if terminal > 0 {
493 Some(state.completed as f64 / terminal as f64)
494 } else {
495 None
496 };
497
498 let fetch_success_rate = if state.fetch_attempted > 0 {
499 Some(state.fetch_succeeded as f64 / state.fetch_attempted as f64)
500 } else {
501 None
502 };
503
504 let postprocess_success_rate = if state.postprocess_attempted > 0 {
505 Some(state.postprocess_succeeded as f64 / state.postprocess_attempted as f64)
506 } else {
507 None
508 };
509
510 let postprocess_avg_duration = if state.postprocess_succeeded > 0 {
511 Some(Duration::from_secs_f64(
512 state.total_postprocess_duration.as_secs_f64() / state.postprocess_succeeded as f64,
513 ))
514 } else {
515 None
516 };
517
518 let playlist_items_total = state.playlist_items_successful + state.playlist_items_failed;
519 let item_success_rate = if playlist_items_total > 0 {
520 Some(state.playlist_items_successful as f64 / playlist_items_total as f64)
521 } else {
522 None
523 };
524
525 let mut active_downloads: Vec<ActiveDownloadSnapshot> = state
526 .in_progress
527 .iter()
528 .map(|(id, r)| {
529 let progress = if r.total_bytes > 0 {
530 Some(r.downloaded_bytes as f64 / r.total_bytes as f64)
531 } else {
532 None
533 };
534 ActiveDownloadSnapshot {
535 download_id: *id,
536 url: r.url.clone(),
537 priority: r.priority,
538 downloaded_bytes: r.downloaded_bytes,
539 total_bytes: r.total_bytes,
540 progress,
541 peak_speed_bytes_per_sec: r.peak_speed,
542 elapsed: r.started_at.map(|s| now.duration_since(s)),
543 time_since_queued: now.duration_since(r.queued_at),
544 }
545 })
546 .collect();
547 active_downloads.sort_by_key(|e| e.download_id);
548
549 let recent_downloads: Vec<DownloadSnapshot> = state
550 .history
551 .iter()
552 .map(|r| DownloadSnapshot {
553 download_id: r.download_id,
554 url: r.url.clone(),
555 priority: r.priority,
556 outcome: match r.outcome {
557 DownloadOutcome::Completed => DownloadOutcomeSnapshot::Completed,
558 DownloadOutcome::Failed => DownloadOutcomeSnapshot::Failed,
559 DownloadOutcome::Canceled => DownloadOutcomeSnapshot::Canceled,
560 },
561 bytes: r.bytes,
562 duration: r.duration,
563 queue_wait: r.queue_wait,
564 peak_speed_bytes_per_sec: r.peak_speed,
565 retry_count: r.retry_count,
566 })
567 .collect();
568
569 GlobalSnapshot {
570 downloads: DownloadStats {
571 attempted: state.attempted,
572 completed: state.completed,
573 failed: state.failed,
574 canceled: state.canceled,
575 queued: state.queued,
576 total_bytes: state.total_bytes,
577 total_retries: state.total_retries,
578 total_duration: state.total_download_duration,
579 avg_duration: state.avg_download_duration(),
580 avg_speed_bytes_per_sec: state.avg_speed_bytes_per_sec(),
581 peak_speed_bytes_per_sec: state.peak_speed_bytes_per_sec(),
582 success_rate: download_success_rate,
583 },
584 fetches: FetchStats {
585 attempted: state.fetch_attempted,
586 succeeded: state.fetch_succeeded,
587 failed: state.fetch_failed,
588 avg_duration: state.avg_fetch_duration(),
589 success_rate: fetch_success_rate,
590 },
591 post_processing: PostProcessStats {
592 attempted: state.postprocess_attempted,
593 succeeded: state.postprocess_succeeded,
594 failed: state.postprocess_failed,
595 avg_duration: postprocess_avg_duration,
596 success_rate: postprocess_success_rate,
597 },
598 playlists: PlaylistStats {
599 playlists_fetched: state.playlists_fetched,
600 playlists_fetch_failed: state.playlist_fetch_failed,
601 items_successful: state.playlist_items_successful,
602 items_failed: state.playlist_items_failed,
603 item_success_rate,
604 },
605 active_count: active_downloads.len(),
606 active_downloads,
607 recent_downloads,
608 }
609}