Skip to main content

kithara_stream/dl/
downloader.rs

1use std::sync::{Arc, atomic::AtomicUsize};
2
3use futures::task::AtomicWaker;
4use kithara_abr::{Abr, AbrController, AbrPeerId};
5use kithara_events::EventBus;
6use kithara_net::HttpClient;
7use kithara_platform::{Mutex, RwLock, time::Duration, tokio, tokio::sync::mpsc};
8use kithara_test_utils::kithara;
9use tokio_util::sync::CancellationToken;
10
11use super::{
12    peer::{Peer, PeerHandle},
13    registry::{FetchProgress, Registry},
14};
15
16/// Capacity of the per-peer bounded command channel.
17const PEER_CMD_CHANNEL_CAPACITY: usize = 32;
18
19/// Unified downloader — sole HTTP client owner and fetch orchestrator.
20///
21/// Created once at the application level, then shared (via [`Clone`]) across
22/// protocol configs. Owns the [`HttpClient`] and the runtime handle.
23/// Protocols obtain a [`PeerHandle`] via [`register`](Self::register) and
24/// issue fetches through [`PeerHandle::execute`].
25#[derive(Clone)]
26pub struct Downloader {
27    inner: Arc<DownloaderInner>,
28}
29
30impl std::fmt::Debug for Downloader {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("Downloader").finish_non_exhaustive()
33    }
34}
35
36/// Peer registration entry sent to the download loop.
37pub(super) struct RegisteredPeerEntry {
38    /// ABR peer identifier assigned at registration. The Registry stamps
39    /// every proactively-scheduled `InternalCmd` with this id so the
40    /// Downloader can credit bandwidth samples to the right peer.
41    pub(super) peer_id: AbrPeerId,
42    /// Shared bus reference. Written by
43    /// [`PeerHandle::with_bus`](super::peer::PeerHandle::with_bus), read
44    /// by the Registry when dispatching fetches so that
45    /// `DownloaderEvent::LoadSlow` lands on the owning track's bus.
46    pub(super) bus: Arc<RwLock<Option<EventBus>>>,
47    pub(super) peer: Arc<dyn Peer>,
48    /// Handle's cancel token. Fires from `PeerInner::Drop` when the last
49    /// `PeerHandle` clone is released, letting the [`Registry`] drop the
50    /// peer entry (and its `Arc<dyn Peer>`).
51    pub(super) cancel: CancellationToken,
52    pub(super) cmd_rx: mpsc::Receiver<super::peer::InternalCmd>,
53}
54
55/// Shared inner state for the downloader.
56///
57/// Both [`Downloader`] and [`PeerHandle`] hold an `Arc` to this; cloning
58/// either is just an Arc bump.
59pub(super) struct DownloaderInner {
60    /// Shared ABR controller. One per Downloader — peers register through
61    /// `register()` and fetch-completion hooks call
62    /// `controller.record_bandwidth(...)` automatically.
63    pub(super) abr: Arc<AbrController>,
64    /// Waker fired when a fetch task completes (inflight decremented).
65    /// Wakes `poll_fn` in `Registry::tick` so it can spawn more work.
66    pub(super) fetch_waker: Arc<AtomicWaker>,
67    /// Global in-flight fetch counter. Limits total concurrent HTTP
68    /// connections across all peers and command types.
69    pub(super) inflight: Arc<AtomicUsize>,
70    pub(super) cancel: CancellationToken,
71    pub(super) chunk_timeout: Duration,
72    pub(super) demand_throttle: Duration,
73    pub(super) soft_timeout: Duration,
74    pub(super) client: HttpClient,
75    /// Receiver — taken once by [`ensure_spawned`](Downloader::ensure_spawned).
76    pub(super) register_rx: Mutex<Option<mpsc::UnboundedReceiver<RegisteredPeerEntry>>>,
77    #[cfg(not(target_arch = "wasm32"))]
78    pub(super) runtime: Option<tokio::runtime::Handle>,
79    /// Sender for registering new peers (cold path).
80    pub(super) register_tx: mpsc::UnboundedSender<RegisteredPeerEntry>,
81    pub(super) max_concurrent: usize,
82    /// Monotonic source of [`kithara_events::RequestId`]s assigned to
83    /// every command this Downloader accepts. Starts at 1 (`NonZero`
84    /// invariant); never wraps in practice (`u64`).
85    next_request_id: std::sync::atomic::AtomicU64,
86}
87
88impl DownloaderInner {
89    /// Allocate a fresh [`kithara_events::RequestId`].
90    pub(super) fn next_request_id(&self) -> kithara_events::RequestId {
91        let raw = self
92            .next_request_id
93            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
94        let nz = std::num::NonZeroU64::new(raw.max(1))
95            .expect("BUG: next_request_id starts at 1; fetch_add never yields 0");
96        kithara_events::RequestId::new(nz)
97    }
98}
99
100impl Downloader {
101    /// Hang-detector timeout for the downloader run loop. 60 s covers
102    /// the worst legitimate quiet period (single peer waiting on a
103    /// slow first byte) without delaying real-deadlock diagnostics.
104    const HANG_TIMEOUT: Duration = Duration::from_secs(60);
105
106    /// Create a new downloader from configuration.
107    ///
108    /// Adopts `config.client` (a clone of the caller's [`HttpClient`])
109    /// and the shared [`AbrController`] from `config.abr_settings`.
110    #[must_use]
111    pub fn new(config: super::DownloaderConfig) -> Self {
112        let (tx, rx) = mpsc::unbounded_channel();
113        let chunk_timeout = config.client.options().inactivity_timeout;
114        let soft_timeout = config.soft_timeout;
115        #[cfg(not(target_arch = "wasm32"))]
116        let runtime = config.runtime;
117        let abr = AbrController::new(config.abr_settings);
118        Self {
119            inner: Arc::new(DownloaderInner {
120                chunk_timeout,
121                soft_timeout,
122                #[cfg(not(target_arch = "wasm32"))]
123                runtime,
124                abr,
125                client: config.client,
126                cancel: config.cancel,
127                max_concurrent: config.max_concurrent,
128                demand_throttle: config.demand_throttle,
129                inflight: Arc::new(AtomicUsize::new(0)),
130                fetch_waker: Arc::new(AtomicWaker::new()),
131                register_tx: tx,
132                register_rx: Mutex::new(Some(rx)),
133                next_request_id: std::sync::atomic::AtomicU64::new(1),
134            }),
135        }
136    }
137
138    /// Ensure the download loop is running (lazy spawn on first register
139    /// in an async-capable context).
140    fn ensure_spawned(&self) {
141        let Some(rx) = self.inner.register_rx.lock_sync().take() else {
142            return;
143        };
144        let this = self.clone();
145        Self::spawn_run(&self.inner, this, rx);
146    }
147
148    /// Register a peer and return its [`PeerHandle`].
149    ///
150    /// Double-registers the peer: fetch channel through the download loop
151    /// and ABR state through the shared controller. The returned handle's
152    /// `Drop` unregisters both.
153    pub fn register(&self, peer: Arc<dyn Peer>) -> PeerHandle {
154        self.ensure_spawned();
155        let cancel = self.inner.cancel.child_token();
156        let (cmd_tx, cmd_rx) = mpsc::channel(PEER_CMD_CHANNEL_CAPACITY);
157        let bus: Arc<RwLock<Option<EventBus>>> = Arc::new(RwLock::new(None));
158
159        let abr_peer: Arc<dyn Abr> = Arc::clone(&peer) as Arc<dyn Abr>;
160        let abr_handle = self.inner.abr.register(&abr_peer);
161        let peer_id = abr_handle.peer_id();
162
163        let entry = RegisteredPeerEntry {
164            peer,
165            cmd_rx,
166            peer_id,
167            cancel: cancel.clone(),
168            bus: Arc::clone(&bus),
169        };
170        self.inner.register_tx.send(entry).ok();
171        PeerHandle::new(Arc::clone(&self.inner), cancel, cmd_tx, bus, abr_handle)
172    }
173
174    /// Download loop.
175    ///
176    /// Drives the [`Registry`] which owns peers, routes commands through
177    /// a 2×2 priority slot map, and executes batches via [`BatchGroup`].
178    ///
179    /// Registrations are polled inside `tick()` so that `process()` is
180    /// never dropped mid-batch by a competing `select!` arm (cancellation-
181    /// safety: dropping `process()` loses unspawned `FetchCmd`s whose
182    /// `on_complete` callbacks will never fire).
183    ///
184    /// # Deadlock detection
185    ///
186    /// `tick` reports a [`FetchProgress`](super::registry::FetchProgress):
187    /// - `Advanced` — something moved (cmd drained, peer yielded a batch,
188    ///   urgent/demand batch processed, or inflight changed). Reset.
189    /// - `Idle` — nothing to do: no queued cmds, no in-flight, peers
190    ///   pending. Watchdog left as-is; legitimate quiet period.
191    /// - `Stalled` — work exists (queued cmds or inflight > 0) but no
192    ///   forward motion this tick. Tick the watchdog; N consecutive
193    ///   stalls across the timeout window → panic.
194    #[kithara::hang_watchdog(timeout = Self::HANG_TIMEOUT)]
195    async fn run(&self, mut register_rx: mpsc::UnboundedReceiver<RegisteredPeerEntry>) {
196        let mut registry = Registry::default();
197
198        loop {
199            let progress = tokio::select! {
200                biased;
201                () = self.inner.cancel.cancelled() => return,
202                p = registry.tick(&self.inner, &mut register_rx) => p,
203            };
204
205            match progress {
206                FetchProgress::Advanced => {
207                    hang_reset!();
208                }
209                FetchProgress::Stalled => {
210                    hang_tick!();
211                }
212                FetchProgress::Idle => {}
213            }
214
215            registry.reschedule();
216        }
217    }
218
219    #[cfg(not(target_arch = "wasm32"))]
220    fn spawn_run(
221        inner: &DownloaderInner,
222        this: Self,
223        rx: mpsc::UnboundedReceiver<RegisteredPeerEntry>,
224    ) {
225        let Some(handle) = inner
226            .runtime
227            .clone()
228            .or_else(|| tokio::runtime::Handle::try_current().ok())
229        else {
230            return;
231        };
232        handle.spawn(async move { this.run(rx).await });
233    }
234
235    #[cfg(target_arch = "wasm32")]
236    fn spawn_run(
237        _inner: &DownloaderInner,
238        this: Self,
239        rx: mpsc::UnboundedReceiver<RegisteredPeerEntry>,
240    ) {
241        drop(tokio::task::spawn(async move {
242            this.run(rx).await;
243        }));
244    }
245}
246
247impl Drop for DownloaderInner {
248    fn drop(&mut self) {
249        self.cancel.cancel();
250    }
251}