use std::sync::{Arc, atomic::AtomicUsize};
use futures::task::AtomicWaker;
use kithara_abr::{Abr, AbrController, AbrPeerId};
use kithara_events::EventBus;
use kithara_net::HttpClient;
use kithara_platform::{Mutex, RwLock, time::Duration, tokio, tokio::sync::mpsc};
use kithara_test_utils::kithara;
use tokio_util::sync::CancellationToken;
use super::{
peer::{Peer, PeerHandle},
registry::{FetchProgress, Registry},
};
const PEER_CMD_CHANNEL_CAPACITY: usize = 32;
#[derive(Clone)]
pub struct Downloader {
inner: Arc<DownloaderInner>,
}
impl std::fmt::Debug for Downloader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Downloader").finish_non_exhaustive()
}
}
pub(super) struct RegisteredPeerEntry {
pub(super) peer_id: AbrPeerId,
pub(super) bus: Arc<RwLock<Option<EventBus>>>,
pub(super) peer: Arc<dyn Peer>,
pub(super) cancel: CancellationToken,
pub(super) cmd_rx: mpsc::Receiver<super::peer::InternalCmd>,
}
pub(super) struct DownloaderInner {
pub(super) abr: Arc<AbrController>,
pub(super) fetch_waker: Arc<AtomicWaker>,
pub(super) inflight: Arc<AtomicUsize>,
pub(super) cancel: CancellationToken,
pub(super) chunk_timeout: Duration,
pub(super) demand_throttle: Duration,
pub(super) soft_timeout: Duration,
pub(super) client: HttpClient,
pub(super) register_rx: Mutex<Option<mpsc::UnboundedReceiver<RegisteredPeerEntry>>>,
#[cfg(not(target_arch = "wasm32"))]
pub(super) runtime: Option<tokio::runtime::Handle>,
pub(super) register_tx: mpsc::UnboundedSender<RegisteredPeerEntry>,
pub(super) max_concurrent: usize,
next_request_id: std::sync::atomic::AtomicU64,
}
impl DownloaderInner {
pub(super) fn next_request_id(&self) -> kithara_events::RequestId {
let raw = self
.next_request_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let nz = std::num::NonZeroU64::new(raw.max(1))
.expect("BUG: next_request_id starts at 1; fetch_add never yields 0");
kithara_events::RequestId::new(nz)
}
}
impl Downloader {
const HANG_TIMEOUT: Duration = Duration::from_secs(60);
#[must_use]
pub fn new(config: super::DownloaderConfig) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let chunk_timeout = config.client.options().inactivity_timeout;
let soft_timeout = config.soft_timeout;
#[cfg(not(target_arch = "wasm32"))]
let runtime = config.runtime;
let abr = AbrController::new(config.abr_settings);
Self {
inner: Arc::new(DownloaderInner {
chunk_timeout,
soft_timeout,
#[cfg(not(target_arch = "wasm32"))]
runtime,
abr,
client: config.client,
cancel: config.cancel,
max_concurrent: config.max_concurrent,
demand_throttle: config.demand_throttle,
inflight: Arc::new(AtomicUsize::new(0)),
fetch_waker: Arc::new(AtomicWaker::new()),
register_tx: tx,
register_rx: Mutex::new(Some(rx)),
next_request_id: std::sync::atomic::AtomicU64::new(1),
}),
}
}
fn ensure_spawned(&self) {
let Some(rx) = self.inner.register_rx.lock_sync().take() else {
return;
};
let this = self.clone();
Self::spawn_run(&self.inner, this, rx);
}
pub fn register(&self, peer: Arc<dyn Peer>) -> PeerHandle {
self.ensure_spawned();
let cancel = self.inner.cancel.child_token();
let (cmd_tx, cmd_rx) = mpsc::channel(PEER_CMD_CHANNEL_CAPACITY);
let bus: Arc<RwLock<Option<EventBus>>> = Arc::new(RwLock::new(None));
let abr_peer: Arc<dyn Abr> = Arc::clone(&peer) as Arc<dyn Abr>;
let abr_handle = self.inner.abr.register(&abr_peer);
let peer_id = abr_handle.peer_id();
let entry = RegisteredPeerEntry {
peer,
cmd_rx,
peer_id,
cancel: cancel.clone(),
bus: Arc::clone(&bus),
};
self.inner.register_tx.send(entry).ok();
PeerHandle::new(Arc::clone(&self.inner), cancel, cmd_tx, bus, abr_handle)
}
#[kithara::hang_watchdog(timeout = Self::HANG_TIMEOUT)]
async fn run(&self, mut register_rx: mpsc::UnboundedReceiver<RegisteredPeerEntry>) {
let mut registry = Registry::default();
loop {
let progress = tokio::select! {
biased;
() = self.inner.cancel.cancelled() => return,
p = registry.tick(&self.inner, &mut register_rx) => p,
};
match progress {
FetchProgress::Advanced => {
hang_reset!();
}
FetchProgress::Stalled => {
hang_tick!();
}
FetchProgress::Idle => {}
}
registry.reschedule();
}
}
#[cfg(not(target_arch = "wasm32"))]
fn spawn_run(
inner: &DownloaderInner,
this: Self,
rx: mpsc::UnboundedReceiver<RegisteredPeerEntry>,
) {
let Some(handle) = inner
.runtime
.clone()
.or_else(|| tokio::runtime::Handle::try_current().ok())
else {
return;
};
handle.spawn(async move { this.run(rx).await });
}
#[cfg(target_arch = "wasm32")]
fn spawn_run(
_inner: &DownloaderInner,
this: Self,
rx: mpsc::UnboundedReceiver<RegisteredPeerEntry>,
) {
drop(tokio::task::spawn(async move {
this.run(rx).await;
}));
}
}
impl Drop for DownloaderInner {
fn drop(&mut self) {
self.cancel.cancel();
}
}