use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::time;
use tokio_util::sync::CancellationToken;
use super::hls;
use crate::error::{Error, Result};
use crate::events::{DownloadEvent, EventBus};
pub(super) const PROGRESS_THROTTLE_NANOS: u64 = 50_000_000;
pub(super) const SEGMENT_RETRY_ATTEMPTS: u32 = 3;
pub(super) const SEGMENT_RETRY_DELAY: Duration = Duration::from_millis(500);
pub(super) const POLL_INTERVAL_DIVISOR: f64 = 2.0;
pub(super) const BITS_PER_BYTE: f64 = 8.0;
pub(super) const ZERO_U64: u64 = 0;
pub(super) const ZERO_F64: f64 = 0.0;
#[derive(Debug, Clone, Copy)]
pub(super) enum SegmentErrorMode {
#[cfg(feature = "live-recording")]
Recording,
#[cfg(feature = "live-streaming")]
Streaming,
}
#[derive(Debug, Clone)]
pub struct LiveFragment {
pub sequence: u64,
pub duration: Duration,
pub url: String,
pub data: Vec<u8>,
}
pub(super) struct LiveCoreConfig {
pub(super) playlist_url: String,
pub(super) video_id: String,
pub(super) quality: String,
pub(super) max_duration: Option<Duration>,
pub(super) cancellation_token: CancellationToken,
pub(super) client: Arc<reqwest::Client>,
pub(super) event_bus: EventBus,
}
#[derive(Debug, Clone)]
pub(super) struct LiveCore {
pub(super) playlist_url: String,
pub(super) video_id: String,
pub(super) quality: String,
pub(super) max_duration: Option<Duration>,
pub(super) cancellation_token: CancellationToken,
pub(super) client: Arc<reqwest::Client>,
pub(super) event_bus: EventBus,
}
impl LiveCore {
pub(super) fn new(config: LiveCoreConfig) -> Self {
Self {
playlist_url: config.playlist_url,
video_id: config.video_id,
quality: config.quality,
max_duration: config.max_duration,
cancellation_token: config.cancellation_token,
client: config.client,
event_bus: config.event_bus,
}
}
pub(super) async fn fetch_segment(&self, url: &str, mode: SegmentErrorMode) -> Result<Vec<u8>> {
let mut last_error = None;
for attempt in 1..=SEGMENT_RETRY_ATTEMPTS {
match self.fetch_segment_once(url, mode).await {
Ok(data) => return Ok(data),
Err(e) => {
if attempt < SEGMENT_RETRY_ATTEMPTS {
tracing::warn!(
url = url,
attempt = attempt,
max_attempts = SEGMENT_RETRY_ATTEMPTS,
error = %e,
"Segment fetch failed, retrying"
);
time::sleep(SEGMENT_RETRY_DELAY).await;
}
last_error = Some(e);
}
}
}
Err(last_error.unwrap())
}
pub(super) async fn fetch_segment_once(&self, url: &str, mode: SegmentErrorMode) -> Result<Vec<u8>> {
let response = self
.client
.get(url)
.send()
.await
.map_err(|e| Error::http(url, "fetching HLS segment", e))?;
let status = response.status();
if !status.is_success() {
return Err(self.segment_fetch_failed(url, status, mode));
}
let bytes = response
.bytes()
.await
.map_err(|e| Error::http(url, "reading segment body", e))?;
Ok(bytes.to_vec())
}
pub(super) async fn fetch_fragment(
&self,
segment: &hls::HlsSegment,
mode: SegmentErrorMode,
) -> Result<LiveFragment> {
let data = self.fetch_segment(&segment.url, mode).await?;
Ok(LiveFragment {
sequence: segment.sequence,
duration: Duration::from_secs_f64(segment.duration),
url: segment.url.clone(),
data,
})
}
fn segment_fetch_failed(&self, url: &str, status: reqwest::StatusCode, mode: SegmentErrorMode) -> Error {
match mode {
#[cfg(feature = "live-recording")]
SegmentErrorMode::Recording => {
Error::live_recording(url, format!("segment fetch returned HTTP {}", status))
}
#[cfg(feature = "live-streaming")]
SegmentErrorMode::Streaming => Error::live_segment_fetch_failed(url, status),
}
}
}
#[cfg(feature = "live-recording")]
#[derive(Debug, Clone)]
pub(super) struct RecordingStats {
pub(super) total_bytes: u64,
pub(super) total_duration: Duration,
pub(super) segments_downloaded: u64,
pub(super) stop_reason: String,
}
pub(super) const SEQUENCE_TRACK_WINDOW: usize = 512;
pub(super) fn track_sequence(
sequence: u64,
seen: &mut std::collections::HashSet<u64>,
window: &mut std::collections::VecDeque<u64>,
) {
if seen.insert(sequence) {
window.push_back(sequence);
}
while window.len() > SEQUENCE_TRACK_WINDOW {
if let Some(evicted) = window.pop_front() {
seen.remove(&evicted);
}
}
}
pub(super) fn emit_live_progress(
core: &LiveCore,
start: Instant,
bytes_written: &AtomicU64,
segments_downloaded: u64,
last_progress_nanos: &mut u64,
make_event: impl FnOnce(Duration, u64, u64, f64) -> DownloadEvent,
) {
let now_nanos = start.elapsed().as_nanos() as u64;
if now_nanos - *last_progress_nanos >= PROGRESS_THROTTLE_NANOS {
*last_progress_nanos = now_nanos;
let total_bytes = bytes_written.load(Ordering::Relaxed);
let elapsed = start.elapsed();
let bitrate_bps = if elapsed.as_secs_f64() > ZERO_F64 {
(total_bytes as f64 * BITS_PER_BYTE) / elapsed.as_secs_f64()
} else {
ZERO_F64
};
core.event_bus
.emit_if_subscribed(make_event(elapsed, total_bytes, segments_downloaded, bitrate_bps));
}
}