kithara_stream/dl/
downloader.rs1use std::sync::{Arc, atomic::AtomicUsize};
2
3use futures::task::AtomicWaker;
4use kithara_abr::{Abr, AbrController, AbrPeerId};
5use kithara_events::EventBus;
6use kithara_net::HttpClient;
7use kithara_platform::{Mutex, RwLock, time::Duration, tokio, tokio::sync::mpsc};
8use kithara_test_utils::kithara;
9use tokio_util::sync::CancellationToken;
10
11use super::{
12 peer::{Peer, PeerHandle},
13 registry::{FetchProgress, Registry},
14};
15
16const PEER_CMD_CHANNEL_CAPACITY: usize = 32;
18
19#[derive(Clone)]
26pub struct Downloader {
27 inner: Arc<DownloaderInner>,
28}
29
30impl std::fmt::Debug for Downloader {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 f.debug_struct("Downloader").finish_non_exhaustive()
33 }
34}
35
36pub(super) struct RegisteredPeerEntry {
38 pub(super) peer_id: AbrPeerId,
42 pub(super) bus: Arc<RwLock<Option<EventBus>>>,
47 pub(super) peer: Arc<dyn Peer>,
48 pub(super) cancel: CancellationToken,
52 pub(super) cmd_rx: mpsc::Receiver<super::peer::InternalCmd>,
53}
54
55pub(super) struct DownloaderInner {
60 pub(super) abr: Arc<AbrController>,
64 pub(super) fetch_waker: Arc<AtomicWaker>,
67 pub(super) inflight: Arc<AtomicUsize>,
70 pub(super) cancel: CancellationToken,
71 pub(super) chunk_timeout: Duration,
72 pub(super) demand_throttle: Duration,
73 pub(super) soft_timeout: Duration,
74 pub(super) client: HttpClient,
75 pub(super) register_rx: Mutex<Option<mpsc::UnboundedReceiver<RegisteredPeerEntry>>>,
77 #[cfg(not(target_arch = "wasm32"))]
78 pub(super) runtime: Option<tokio::runtime::Handle>,
79 pub(super) register_tx: mpsc::UnboundedSender<RegisteredPeerEntry>,
81 pub(super) max_concurrent: usize,
82 next_request_id: std::sync::atomic::AtomicU64,
86}
87
88impl DownloaderInner {
89 pub(super) fn next_request_id(&self) -> kithara_events::RequestId {
91 let raw = self
92 .next_request_id
93 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
94 let nz = std::num::NonZeroU64::new(raw.max(1))
95 .expect("BUG: next_request_id starts at 1; fetch_add never yields 0");
96 kithara_events::RequestId::new(nz)
97 }
98}
99
100impl Downloader {
101 const HANG_TIMEOUT: Duration = Duration::from_secs(60);
105
106 #[must_use]
111 pub fn new(config: super::DownloaderConfig) -> Self {
112 let (tx, rx) = mpsc::unbounded_channel();
113 let chunk_timeout = config.client.options().inactivity_timeout;
114 let soft_timeout = config.soft_timeout;
115 #[cfg(not(target_arch = "wasm32"))]
116 let runtime = config.runtime;
117 let abr = AbrController::new(config.abr_settings);
118 Self {
119 inner: Arc::new(DownloaderInner {
120 chunk_timeout,
121 soft_timeout,
122 #[cfg(not(target_arch = "wasm32"))]
123 runtime,
124 abr,
125 client: config.client,
126 cancel: config.cancel,
127 max_concurrent: config.max_concurrent,
128 demand_throttle: config.demand_throttle,
129 inflight: Arc::new(AtomicUsize::new(0)),
130 fetch_waker: Arc::new(AtomicWaker::new()),
131 register_tx: tx,
132 register_rx: Mutex::new(Some(rx)),
133 next_request_id: std::sync::atomic::AtomicU64::new(1),
134 }),
135 }
136 }
137
138 fn ensure_spawned(&self) {
141 let Some(rx) = self.inner.register_rx.lock_sync().take() else {
142 return;
143 };
144 let this = self.clone();
145 Self::spawn_run(&self.inner, this, rx);
146 }
147
148 pub fn register(&self, peer: Arc<dyn Peer>) -> PeerHandle {
154 self.ensure_spawned();
155 let cancel = self.inner.cancel.child_token();
156 let (cmd_tx, cmd_rx) = mpsc::channel(PEER_CMD_CHANNEL_CAPACITY);
157 let bus: Arc<RwLock<Option<EventBus>>> = Arc::new(RwLock::new(None));
158
159 let abr_peer: Arc<dyn Abr> = Arc::clone(&peer) as Arc<dyn Abr>;
160 let abr_handle = self.inner.abr.register(&abr_peer);
161 let peer_id = abr_handle.peer_id();
162
163 let entry = RegisteredPeerEntry {
164 peer,
165 cmd_rx,
166 peer_id,
167 cancel: cancel.clone(),
168 bus: Arc::clone(&bus),
169 };
170 self.inner.register_tx.send(entry).ok();
171 PeerHandle::new(Arc::clone(&self.inner), cancel, cmd_tx, bus, abr_handle)
172 }
173
174 #[kithara::hang_watchdog(timeout = Self::HANG_TIMEOUT)]
195 async fn run(&self, mut register_rx: mpsc::UnboundedReceiver<RegisteredPeerEntry>) {
196 let mut registry = Registry::default();
197
198 loop {
199 let progress = tokio::select! {
200 biased;
201 () = self.inner.cancel.cancelled() => return,
202 p = registry.tick(&self.inner, &mut register_rx) => p,
203 };
204
205 match progress {
206 FetchProgress::Advanced => {
207 hang_reset!();
208 }
209 FetchProgress::Stalled => {
210 hang_tick!();
211 }
212 FetchProgress::Idle => {}
213 }
214
215 registry.reschedule();
216 }
217 }
218
219 #[cfg(not(target_arch = "wasm32"))]
220 fn spawn_run(
221 inner: &DownloaderInner,
222 this: Self,
223 rx: mpsc::UnboundedReceiver<RegisteredPeerEntry>,
224 ) {
225 let Some(handle) = inner
226 .runtime
227 .clone()
228 .or_else(|| tokio::runtime::Handle::try_current().ok())
229 else {
230 return;
231 };
232 handle.spawn(async move { this.run(rx).await });
233 }
234
235 #[cfg(target_arch = "wasm32")]
236 fn spawn_run(
237 _inner: &DownloaderInner,
238 this: Self,
239 rx: mpsc::UnboundedReceiver<RegisteredPeerEntry>,
240 ) {
241 drop(tokio::task::spawn(async move {
242 this.run(rx).await;
243 }));
244 }
245}
246
247impl Drop for DownloaderInner {
248 fn drop(&mut self) {
249 self.cancel.cancel();
250 }
251}